Commit b2bc700b by guos

会员标签4.0

parent 3f26a427
...@@ -6,15 +6,20 @@ import com.gic.spark.datasource.mysql.MysqlRddManager; ...@@ -6,15 +6,20 @@ import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagIntegralBean; import com.gic.spark.entity.bean.TagIntegralBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagIntegralRequest; import com.gic.spark.entity.request.TagIntegralRequest;
import com.gic.spark.tag.TagConstant;
import com.gic.spark.util.AppEnvUtil; import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil; import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column; import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* @description: * @description:
...@@ -45,10 +50,105 @@ public class TagAboutExpireIntegralFilter implements BaseTagFilter { ...@@ -45,10 +50,105 @@ public class TagAboutExpireIntegralFilter implements BaseTagFilter {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request; TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId) Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull()); .filter(new Column("limit_time").isNotNull())
.filter(new Column("integral_change_type").notEqual(3))
.filter(new Column("integral_value").$greater(0));
JavaRDD<TagIntegralBean>integralRDD= MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD(); JavaRDD<TagIntegralBean>integralRDD= MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD();
JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getDomainType()){
case ACU_INFO:
if(TagConstant.CU_TYPE_ACU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case SCU_INFO:
if(TagConstant.CU_TYPE_SCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case MCU_INFO:
if(TagConstant.CU_TYPE_MCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
default:break;
}
}
return result.iterator();
}).mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getTimeRangeType()){
case LATELY:
if(integralBean.getLimit_time().getTime()>
DateUtil.addNumForDay(DateUtil.getDate(),integralRequest.getTimeNum()).getTime()){
result.add(integralBean);
}
break;
case FIXATION:
if(integralBean.getLimit_time().getTime()>=integralRequest.getBeginTime().getTime()
&&integralBean.getCreate_Time().getTime()<=integralRequest.getEndTime().getTime()){
result.add(integralBean);
}
break;
default:break;
}
}
return result.iterator();
}).mapToPair(data-> Tuple2.apply(data.getFile_ecu_id(),data.getIntegral_Value()))
.reduceByKey((x,y)->x+y)
.mapPartitions(data->{
Set<Long> result=new HashSet();
while (data.hasNext()){
Tuple2<Long,Integer> tp2=data.next();
int integralValue=tp2._2();
switch (integralRequest.getNumberType()){
case eq:
if(integralRequest.getEqualNum()==integralValue){
result.add(tp2._1());
}
break;
case gt:
if(integralValue>integralRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(integralValue>=integralRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(integralValue<integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(integralValue<=integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
case between:
if(integralValue>=integralRequest.getBeginNum()
&&integralValue<=integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
default:break;
}
}
return result.iterator();
});
return null; return ecuRdd;
} }
} }
...@@ -50,8 +50,7 @@ public class TagAccumulatedIntegralFilter implements BaseTagFilter { ...@@ -50,8 +50,7 @@ public class TagAccumulatedIntegralFilter implements BaseTagFilter {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request; TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId) Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull()) .filter(new Column("create_Time").isNotNull());
.filter(new Column("service_flag").equalTo(1));
JavaRDD<TagIntegralBean>integralRDD=MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD(); JavaRDD<TagIntegralBean>integralRDD=MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD();
JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{ JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment