Commit 4f24f056 by guos

会员标签4.0

parent cd25418b
...@@ -2,6 +2,7 @@ package com.gic.spark.filter; ...@@ -2,6 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceHive; import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.ConstantUtil; import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
...@@ -17,7 +18,7 @@ import java.util.List; ...@@ -17,7 +18,7 @@ import java.util.List;
public abstract class AbstractTagConsumFilter implements BaseTagFilter { public abstract class AbstractTagConsumFilter implements BaseTagFilter {
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D); DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D);
protected static JavaRDD<TrdEcuBrandLabelBean> statisticsTypeHandle(JavaRDD<TrdEcuBrandLabelBean> consumeRDD, TagConsumeRequest consumeRequest){ protected static JavaRDD<TrdEcuBrandLabelBean> statisticsTypeHandle(JavaRDD<TrdEcuBrandLabelBean> consumeRDD, AbstractFilterRequest consumeRequest){
consumeRDD=consumeRDD.mapPartitions(data->{ consumeRDD=consumeRDD.mapPartitions(data->{
List<TrdEcuBrandLabelBean> result=new ArrayList(); List<TrdEcuBrandLabelBean> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
......
...@@ -4,6 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity; ...@@ -4,6 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
...@@ -36,7 +37,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{ ...@@ -36,7 +37,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey() JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
......
...@@ -4,6 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity; ...@@ -4,6 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
...@@ -38,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter { ...@@ -38,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey() JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
......
...@@ -67,10 +67,10 @@ public class TagValueParser { ...@@ -67,10 +67,10 @@ public class TagValueParser {
request=getConsumeRequest(conditionDTO.getTagTemplateElInfo()); request=getConsumeRequest(conditionDTO.getTagTemplateElInfo());
break; break;
case TagConstant.TAG_CODE_ASSOCIATED_PURCHASE_RATE: case TagConstant.TAG_CODE_ASSOCIATED_PURCHASE_RATE:
request=getConsumeRequest(conditionDTO.getTagTemplateElInfo()); request=getConsumeDoubleRequest(conditionDTO.getTagTemplateElInfo());
break; break;
case TagConstant.TAG_CODE_AVERAGE_DISCOUNT_FACTOR: case TagConstant.TAG_CODE_AVERAGE_DISCOUNT_FACTOR:
request=getConsumeRequest(conditionDTO.getTagTemplateElInfo()); request=getConsumeDoubleRequest(conditionDTO.getTagTemplateElInfo());
break; break;
case TagConstant.TAG_CODE_CONSUMPTION_SLEEP_DAYS: case TagConstant.TAG_CODE_CONSUMPTION_SLEEP_DAYS:
request=getConsumeRequest(conditionDTO.getTagTemplateElInfo()); request=getConsumeRequest(conditionDTO.getTagTemplateElInfo());
...@@ -359,6 +359,25 @@ public class TagValueParser { ...@@ -359,6 +359,25 @@ public class TagValueParser {
return request; return request;
} }
private static AbstractFilterRequest getConsumeDoubleRequest(List<TagConditionValDTO> conditionValDTOList) {
TagConsumeDoubleRequest request=new TagConsumeDoubleRequest();
for(TagConditionValDTO conditionValDTO:conditionValDTOList){
if(Pattern.compile("flag").matcher(conditionValDTO.getKey()).find()){
setStatisticsTypeHandle(request,conditionValDTO.getKey(),conditionValDTO.getVal());
}else{
String[] keys=conditionValDTO.getKey().split("\\.");
switch (keys[0]){
case TagConstant.TAG_KEY_TYPE_NUMBER:
setDoubleValueHandle(request,keys[1],conditionValDTO.getVal());
break;
default:break;
}
}
}
return request;
}
private static AbstractFilterRequest getCouponRequest(TagConditionDTO conditionDTO) { private static AbstractFilterRequest getCouponRequest(TagConditionDTO conditionDTO) {
TagCouponRequest request=new TagCouponRequest(); TagCouponRequest request=new TagCouponRequest();
for(TagConditionValDTO conditionValDTO:conditionDTO.getTagTemplateElInfo()){ for(TagConditionValDTO conditionValDTO:conditionDTO.getTagTemplateElInfo()){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment