Commit f5b3e5fb by guos

会员标签4.0

parent 1800bc4c
...@@ -6,6 +6,7 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; ...@@ -6,6 +6,7 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest; import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
...@@ -40,18 +41,17 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{ ...@@ -40,18 +41,17 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey() JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setSeff_goods_num(x.getSeff_goods_num()+y.getSeff_goods_num());
x.setOrder_times(x.getOrder_times()+y.getOrder_times());
return x;
})
.mapPartitions(data->{ .mapPartitions(data->{
List<Long> result=new ArrayList(); List<Long> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next(); Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next();
long totalGodsNum=0; double jointRate= CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())/CommonUtil.isEmptyInteger2int(tp2._2().getOrder_times());
long totalOrderTimes=0;
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
totalGodsNum+=consumeBean.getSeff_goods_num();
totalOrderTimes+=consumeBean.getOrder_times();
}
double jointRate=totalGodsNum/totalOrderTimes;
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
case gt: case gt:
if(jointRate>consumeRequest.getBeginNum()){ if(jointRate>consumeRequest.getBeginNum()){
......
...@@ -6,6 +6,7 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; ...@@ -6,6 +6,7 @@ import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest; import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
...@@ -42,18 +43,17 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter { ...@@ -42,18 +43,17 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey() JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setPay_amt(x.getPay_amt()+y.getPay_amt());
x.setTotal_amt(x.getTotal_amt()+y.getTotal_amt());
return x;
})
.mapPartitions(data->{ .mapPartitions(data->{
List<Long>result=new ArrayList(); List<Long>result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
double payAmt=0; Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next();
double totalAmt=0; double avgDiscountRate= CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt());
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next();
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
payAmt=consumeBean.getPay_amt();
totalAmt=consumeBean.getTotal_amt();
}
double avgDiscountRate=payAmt/totalAmt;
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
case gt: case gt:
if(avgDiscountRate>consumeRequest.getBeginNum()){ if(avgDiscountRate>consumeRequest.getBeginNum()){
......
...@@ -41,47 +41,48 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{ ...@@ -41,47 +41,48 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days())) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.reduceByKey((x,y)->x>y?y:x) .mapPartitionsToPair(data->{
.mapPartitions(data->{ List<Tuple2<Long,Integer>>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,Integer>tp2=data.next(); Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
case gt: case gt:
if(tp2._2()>consumeRequest.getBeginNum()){ if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2._1()); result.add(tp2);
} }
break; break;
case gte: case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){ if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2._1()); result.add(tp2);
} }
break; break;
case lt: case lt:
if(tp2._2()<consumeRequest.getEndNum()){ if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2._1()); result.add(tp2);
} }
break; break;
case lte: case lte:
if(tp2._2()<=consumeRequest.getEndNum()){ if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1()); result.add(tp2);
} }
break; break;
case eq: case eq:
if(tp2._2()==consumeRequest.getEqualNum()){ if(tp2._2()==consumeRequest.getEqualNum()){
result.add(tp2._1()); result.add(tp2);
} }
break; break;
case between: case between:
if(tp2._2()>=consumeRequest.getBeginNum() if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){ &&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1()); result.add(tp2);
} }
default:break; default:break;
} }
} }
return result.iterator(); return result.iterator();
}); })
.reduceByKey((x,y)->x)
.map(data->data._1());
return ecuRdd; return ecuRdd;
} }
} }
...@@ -5,6 +5,7 @@ import com.gic.spark.datasource.mysql.MysqlRddManager; ...@@ -5,6 +5,7 @@ import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
...@@ -39,18 +40,17 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{ ...@@ -39,18 +40,17 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey() JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setPay_amt(x.getPay_amt()+y.getPay_amt());
x.setOrder_times(x.getOrder_times()+y.getOrder_times());
return x;
})
.mapPartitions(data->{ .mapPartitions(data->{
List<Long> result=new ArrayList(); List<Long> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next(); Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next();
int consumeTimes=0; double CusSinglePiece= CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getOrder_times());
double payAmt=0;
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
consumeTimes+=consumeBean.getOrder_times();
payAmt+=consumeBean.getPay_amt();
}
double CusSinglePiece=payAmt/consumeTimes;
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
case gt: case gt:
if(CusSinglePiece>consumeRequest.getBeginNum()){ if(CusSinglePiece>consumeRequest.getBeginNum()){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment