Commit 02c7c08e by guos

会员标签4.0

parent 91190f76
package com.gic.spark.entity.bean;
import java.io.Serializable;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuSalesLabelBean implements Serializable {
private Integer ent_id; //企业id
private Long mbr_area_id;//卡域id
private Long ecu_id;//用户id
private Integer store_info_id;//门店id
private Integer order_times; //消费次数(根据配置取的)
private Integer seff_order_cnt;//配置订单数
private Integer seff_goods_num;//配置销售件数
private Double receive_amt;//应收额
private Double pay_amt;//实付额
private Double total_amt;//吊牌价总额
private Integer sleep_days;//休眠天数
public Integer getEnt_id() {
return ent_id;
}
public void setEnt_id(Integer ent_id) {
this.ent_id = ent_id;
}
public Long getMbr_area_id() {
return mbr_area_id;
}
public void setMbr_area_id(Long mbr_area_id) {
this.mbr_area_id = mbr_area_id;
}
public Long getEcu_id() {
return ecu_id;
}
public void setEcu_id(Long ecu_id) {
this.ecu_id = ecu_id;
}
public Integer getStore_info_id() {
return store_info_id;
}
public void setStore_info_id(Integer store_info_id) {
this.store_info_id = store_info_id;
}
public Integer getOrder_times() {
return order_times;
}
public void setOrder_times(Integer order_times) {
this.order_times = order_times;
}
public Integer getSeff_order_cnt() {
return seff_order_cnt;
}
public void setSeff_order_cnt(Integer seff_order_cnt) {
this.seff_order_cnt = seff_order_cnt;
}
public Integer getSeff_goods_num() {
return seff_goods_num;
}
public void setSeff_goods_num(Integer seff_goods_num) {
this.seff_goods_num = seff_goods_num;
}
public Double getReceive_amt() {
return receive_amt;
}
public void setReceive_amt(Double receive_amt) {
this.receive_amt = receive_amt;
}
public Double getPay_amt() {
return pay_amt;
}
public void setPay_amt(Double pay_amt) {
this.pay_amt = pay_amt;
}
public Double getTotal_amt() {
return total_amt;
}
public void setTotal_amt(Double total_amt) {
this.total_amt = total_amt;
}
public Integer getSleep_days() {
return sleep_days;
}
public void setSleep_days(Integer sleep_days) {
this.sleep_days = sleep_days;
}
}
...@@ -2,10 +2,13 @@ package com.gic.spark.filter; ...@@ -2,10 +2,13 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceHive; import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.ConstantUtil; import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -16,17 +19,23 @@ import java.util.List; ...@@ -16,17 +19,23 @@ import java.util.List;
* @date: 2020/5/7 * @date: 2020/5/7
*/ */
public abstract class AbstractTagConsumFilter implements BaseTagFilter { public abstract class AbstractTagConsumFilter implements BaseTagFilter {
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D); DataSourceHive dataSourceHiveBrandLabel = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D);
DataSourceHive dataSourceHiveSalesLabel = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_SALES_LABEL_D);
protected static JavaRDD<TrdEcuBrandLabelBean> statisticsTypeHandle(JavaRDD<TrdEcuBrandLabelBean> consumeRDD, AbstractFilterRequest consumeRequest){ protected static JavaRDD<TrdEcuSalesLabelBean> statisticsTypeHandle(JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD, AbstractFilterRequest consumeRequest){
consumeRDD=consumeRDD.mapPartitions(data->{ JavaRDD<TrdEcuSalesLabelBean> consumeRDD=labelRDD.mapPartitions(data->{
List<TrdEcuBrandLabelBean> result=new ArrayList(); List<TrdEcuSalesLabelBean> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
TrdEcuBrandLabelBean consumeBean=data.next(); Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>> tp2=data.next();
TrdEcuSalesLabelBean consumeBean=tp2._1();
switch (consumeRequest.getStatisticsType()){ switch (consumeRequest.getStatisticsType()){
case COMMODITYBRAND: case COMMODITYBRAND:
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getEnt_brand_id()))){ if(tp2._2().isPresent()){
result.add(consumeBean); for(TrdEcuBrandLabelBean brandLabelBean:tp2._2().get()){
if(consumeRequest.getStatisticsValList().contains(String.valueOf(brandLabelBean.getEnt_brand_id()))){
result.add(brandLabelToSalesLabel(brandLabelBean));
}
}
} }
break; break;
case CHANNEL: case CHANNEL:
...@@ -46,4 +55,21 @@ public abstract class AbstractTagConsumFilter implements BaseTagFilter { ...@@ -46,4 +55,21 @@ public abstract class AbstractTagConsumFilter implements BaseTagFilter {
}); });
return consumeRDD; return consumeRDD;
} }
protected static TrdEcuSalesLabelBean brandLabelToSalesLabel(TrdEcuBrandLabelBean brandLabelBean){
TrdEcuSalesLabelBean salesLabelBean=new TrdEcuSalesLabelBean();
salesLabelBean.setEnt_id(brandLabelBean.getEnt_id());
salesLabelBean.setMbr_area_id(brandLabelBean.getMbr_area_id());
salesLabelBean.setEcu_id(brandLabelBean.getEcu_id());
salesLabelBean.setStore_info_id(brandLabelBean.getStore_info_id());
salesLabelBean.setOrder_times(brandLabelBean.getOrder_times());
salesLabelBean.setSeff_goods_num(brandLabelBean.getSeff_goods_num());
salesLabelBean.setSeff_order_cnt(brandLabelBean.getSeff_order_cnt());
salesLabelBean.setReceive_amt(brandLabelBean.getReceive_amt());
salesLabelBean.setPay_amt(brandLabelBean.getPay_amt());
salesLabelBean.setTotal_amt(brandLabelBean.getTotal_amt());
salesLabelBean.setSleep_days(brandLabelBean.getSleep_days());
return salesLabelBean;
}
} }
...@@ -3,11 +3,13 @@ package com.gic.spark.filter; ...@@ -3,11 +3,13 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest; import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -32,15 +34,24 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{ ...@@ -32,15 +34,24 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
x.setSeff_goods_num(x.getSeff_goods_num()+y.getSeff_goods_num()); x.setSeff_goods_num(x.getSeff_goods_num()+y.getSeff_goods_num());
...@@ -50,7 +61,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{ ...@@ -50,7 +61,7 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
.mapPartitions(data->{ .mapPartitions(data->{
List<Long> result=new ArrayList(); List<Long> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next(); Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double jointRate= CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt()); double jointRate= CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
case gt: case gt:
......
...@@ -3,11 +3,13 @@ package com.gic.spark.filter; ...@@ -3,11 +3,13 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest; import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -34,15 +36,22 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter { ...@@ -34,15 +36,22 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request; TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
...@@ -55,7 +64,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter { ...@@ -55,7 +64,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
.mapPartitions(data->{ .mapPartitions(data->{
List<Long>result=new ArrayList(); List<Long>result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next(); Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double avgDiscountRate=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt()) double avgDiscountRate=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt())
:CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt()); :CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt());
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
......
...@@ -3,9 +3,11 @@ package com.gic.spark.filter; ...@@ -3,9 +3,11 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -31,15 +33,22 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{ ...@@ -31,15 +33,22 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days())) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.mapPartitionsToPair(data->{ .mapPartitionsToPair(data->{
List<Tuple2<Long,Integer>>result=new ArrayList(); List<Tuple2<Long,Integer>>result=new ArrayList();
......
...@@ -3,9 +3,11 @@ package com.gic.spark.filter; ...@@ -3,9 +3,11 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -29,15 +31,22 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{ ...@@ -29,15 +31,22 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
.mapPartitions(data->{ .mapPartitions(data->{
List<Long>result=new ArrayList(); List<Long>result=new ArrayList();
......
...@@ -3,10 +3,12 @@ package com.gic.spark.filter; ...@@ -3,10 +3,12 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -31,15 +33,21 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{ ...@@ -31,15 +33,21 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest); JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
...@@ -51,7 +59,7 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{ ...@@ -51,7 +59,7 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
.mapPartitions(data->{ .mapPartitions(data->{
List<Long> result=new ArrayList(); List<Long> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next(); Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt()) double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt())
:CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt()); :CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
......
...@@ -3,10 +3,12 @@ package com.gic.spark.filter; ...@@ -3,10 +3,12 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean; import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest; import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest; import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -32,14 +34,24 @@ public class TagUnitPriceFilter extends AbstractTagConsumFilter{ ...@@ -32,14 +34,24 @@ public class TagUnitPriceFilter extends AbstractTagConsumFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveBrandLabel);
result.add(dataSourceHiveSalesLabel);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request; TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD(); JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
...@@ -51,7 +63,7 @@ public class TagUnitPriceFilter extends AbstractTagConsumFilter{ ...@@ -51,7 +63,7 @@ public class TagUnitPriceFilter extends AbstractTagConsumFilter{
.mapPartitions(data->{ .mapPartitions(data->{
List<Long> result=new ArrayList(); List<Long> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long,TrdEcuBrandLabelBean> tp2=data.next(); Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num()) double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())
:CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num()); :CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num());
switch (consumeRequest.getNumberType()){ switch (consumeRequest.getNumberType()){
......
...@@ -8,10 +8,9 @@ package com.gic.spark.util; ...@@ -8,10 +8,9 @@ package com.gic.spark.util;
public class ConstantUtil { public class ConstantUtil {
public static final String TAB_ENTERPRISE_USER="tab_enterprise_user"; public static final String TAB_ENTERPRISE_USER="tab_enterprise_user";
public static final String TAB_MEMBER_USER="tab_member_user";
public static final String TAB_COUPON_LOG="tab_coupon_log"; public static final String TAB_COUPON_LOG="tab_coupon_log";
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log"; public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
// public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d"; public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d";
public static final String ADS_GIC_TRD_ECU_BRAND_LABEL_D="demoads.ads_gic_trd_ecu_brand_label_d"; public static final String ADS_GIC_TRD_ECU_BRAND_LABEL_D="demoads.ads_gic_trd_ecu_brand_label_d";
public static final String DWD_GIC_TRD_VIRTUAL_WDORDER_D="democdm.dwd_gic_trd_virtual_wdorder_d"; public static final String DWD_GIC_TRD_VIRTUAL_WDORDER_D="democdm.dwd_gic_trd_virtual_wdorder_d";
public static final String DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D="democdm.dwd_gic_trd_virtual_order_item_d"; public static final String DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D="democdm.dwd_gic_trd_virtual_order_item_d";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment