Commit b381a3a9 by zhangyannao

代码提交

parent 742d13ef
package com.gic.spark.entity;
import com.gic.spark.entity.request.AbstractFilterRequest;
/**
* Created by paste on 2021-03-11 13:51
*
* @author paste
*/
public class FilterProcessEntity {
String tagGroupId;
AbstractFilterRequest request;
public FilterProcessEntity(String tagGroupId, AbstractFilterRequest request) {
this.tagGroupId = tagGroupId;
this.request = request;
}
public String getTagGroupId() {
return tagGroupId;
}
public void setTagGroupId(String tagGroupId) {
this.tagGroupId = tagGroupId;
}
public AbstractFilterRequest getRequest() {
return request;
}
public void setRequest(AbstractFilterRequest request) {
this.request = request;
}
}
......@@ -3,14 +3,13 @@ package com.gic.spark.entity;
import com.gic.spark.entity.table.TabSceneCrowd;
import java.util.LinkedList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/4/16
*/
public class SceneCrowdDTO extends TabSceneCrowd{
public class SceneCrowdDTO extends TabSceneCrowd {
private LinkedList<TagConditionGroupDTO> conditionGroupDTOList;
......@@ -18,7 +17,7 @@ public class SceneCrowdDTO extends TabSceneCrowd{
public SceneCrowdDTO() {
}
public SceneCrowdDTO(TabSceneCrowd sceneCrowd,LinkedList<TagConditionGroupDTO> conditionGroupDTOList){
public SceneCrowdDTO(TabSceneCrowd sceneCrowd, LinkedList<TagConditionGroupDTO> conditionGroupDTOList) {
this.setId(sceneCrowd.getId());
this.setScene_Crowd_Name(sceneCrowd.getScene_Crowd_Name());
this.setReal_Time(sceneCrowd.getReal_Time());
......
......@@ -8,10 +8,9 @@ import java.sql.Timestamp;
* @author: wangxk
* @date: 2020/4/28
*/
public class TagCouponBean implements Serializable{
public class TagCouponBean implements Serializable {
private Long coupon_id;
private Long ecu_Id;
private Long scu_Id;
private Long acu_Id;
private Long mcu_Id;
......@@ -26,14 +25,6 @@ public class TagCouponBean implements Serializable{
this.coupon_id = coupon_id;
}
public Long getEcu_Id() {
return ecu_Id;
}
public void setEcu_Id(Long ecu_Id) {
this.ecu_Id = ecu_Id;
}
public Long getScu_Id() {
return scu_Id;
}
......
......@@ -13,7 +13,6 @@ public class TagIntegralBean implements Serializable {
private Integer enterprise_Id;
private Integer cu_Type;
private Long cu_Id;
private Long file_ecu_id;
private Integer integral_Value;
private java.sql.Timestamp create_Time;
private java.sql.Timestamp limit_time;
......@@ -42,14 +41,6 @@ public class TagIntegralBean implements Serializable {
this.cu_Id = cu_Id;
}
public Long getFile_ecu_id() {
return file_ecu_id;
}
public void setFile_ecu_id(Long file_ecu_id) {
this.file_ecu_id = file_ecu_id;
}
public Integer getIntegral_Value() {
return integral_Value;
}
......
package com.gic.spark.entity.bean;
import java.io.Serializable;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuBrandBean extends TrdEcuSalesBeanBase implements Serializable {
private Long ent_brand_id;//商品品牌id
public Long getEnt_brand_id() {
return ent_brand_id;
}
public void setEnt_brand_id(Long ent_brand_id) {
this.ent_brand_id = ent_brand_id;
}
}
package com.gic.spark.entity.bean;
import java.io.Serializable;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuBrandLabelBean implements Serializable {
private Integer ent_id; //企业id
private Long mbr_area_id;//卡域id
private Long ecu_id;//用户id
private Integer store_info_id;//门店id
private Integer order_times; //消费次数(根据配置取的)
private Integer seff_order_cnt;//配置订单数
private Integer seff_goods_num;//配置销售件数
private Double receive_amt;//应收额
private Double pay_amt;//实付额
private Double total_amt;//吊牌价总额
private Integer sleep_days;//休眠天数
private Long ent_brand_id;//商品品牌id
public Integer getEnt_id() {
return ent_id;
}
public void setEnt_id(Integer ent_id) {
this.ent_id = ent_id;
}
public Long getMbr_area_id() {
return mbr_area_id;
}
public void setMbr_area_id(Long mbr_area_id) {
this.mbr_area_id = mbr_area_id;
}
public Long getEcu_id() {
return ecu_id;
}
public void setEcu_id(Long ecu_id) {
this.ecu_id = ecu_id;
}
public Integer getStore_info_id() {
return store_info_id;
}
public void setStore_info_id(Integer store_info_id) {
this.store_info_id = store_info_id;
}
public Integer getOrder_times() {
return order_times;
}
public void setOrder_times(Integer order_times) {
this.order_times = order_times;
}
public Integer getSeff_goods_num() {
return seff_goods_num;
}
public void setSeff_goods_num(Integer seff_goods_num) {
this.seff_goods_num = seff_goods_num;
}
public Double getPay_amt() {
return pay_amt;
}
public void setPay_amt(Double pay_amt) {
this.pay_amt = pay_amt;
}
public Double getTotal_amt() {
return total_amt;
}
public void setTotal_amt(Double total_amt) {
this.total_amt = total_amt;
}
public Integer getSleep_days() {
return sleep_days;
}
public void setSleep_days(Integer sleep_days) {
this.sleep_days = sleep_days;
}
public Long getEnt_brand_id() {
return ent_brand_id;
}
public void setEnt_brand_id(Long ent_brand_id) {
this.ent_brand_id = ent_brand_id;
}
public Double getReceive_amt() {
return receive_amt;
}
public void setReceive_amt(Double receive_amt) {
this.receive_amt = receive_amt;
}
public Integer getSeff_order_cnt() {
return seff_order_cnt;
}
public void setSeff_order_cnt(Integer seff_order_cnt) {
this.seff_order_cnt = seff_order_cnt;
}
}
package com.gic.spark.entity.bean;
import java.io.Serializable;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuChannelBean extends TrdEcuSalesBeanBase implements Serializable {
private Long channel_code;//渠道
public Long getChannel_code() {
return channel_code;
}
public void setChannel_code(Long channel_code) {
this.channel_code = channel_code;
}
}
......@@ -7,7 +7,7 @@ import java.io.Serializable;
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuSalesLabelBean implements Serializable {
public class TrdEcuSalesBeanBase implements Serializable {
private Integer ent_id; //企业id
private Long mbr_area_id;//卡域id
......
......@@ -3,77 +3,77 @@ package com.gic.spark.entity.table;
import java.io.Serializable;
public class TabDataActuallyPaidConfig implements Serializable{
public class TabDataActuallyPaidConfig implements Serializable {
private Integer actually_Paid_Config_Id;
private Integer enterprise_Id;
private Integer classify;
private Integer config_Status;
private Integer status;
private java.sql.Timestamp create_Time;
private java.sql.Timestamp update_Time;
private Integer actually_Paid_Config_Id;
private Integer enterprise_Id;
private Integer classify;
private Integer config_Status;
private Integer status;
private java.sql.Timestamp create_Time;
private java.sql.Timestamp update_Time;
public Integer getActually_Paid_Config_Id() {
return actually_Paid_Config_Id;
}
public Integer getActually_Paid_Config_Id() {
return actually_Paid_Config_Id;
}
public void setActually_Paid_Config_Id(Integer actually_Paid_Config_Id) {
this.actually_Paid_Config_Id = actually_Paid_Config_Id;
}
public void setActually_Paid_Config_Id(Integer actually_Paid_Config_Id) {
this.actually_Paid_Config_Id = actually_Paid_Config_Id;
}
public Integer getEnterprise_Id() {
return enterprise_Id;
}
public Integer getEnterprise_Id() {
return enterprise_Id;
}
public void setEnterprise_Id(Integer enterprise_Id) {
this.enterprise_Id = enterprise_Id;
}
public void setEnterprise_Id(Integer enterprise_Id) {
this.enterprise_Id = enterprise_Id;
}
public Integer getClassify() {
return classify;
}
public Integer getClassify() {
return classify;
}
public void setClassify(Integer classify) {
this.classify = classify;
}
public void setClassify(Integer classify) {
this.classify = classify;
}
public Integer getConfig_Status() {
return config_Status;
}
public Integer getConfig_Status() {
return config_Status;
}
public void setConfig_Status(Integer config_Status) {
this.config_Status = config_Status;
}
public void setConfig_Status(Integer config_Status) {
this.config_Status = config_Status;
}
public Integer getStatus() {
return status;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public void setStatus(Integer status) {
this.status = status;
}
public java.sql.Timestamp getCreate_Time() {
return create_Time;
}
public java.sql.Timestamp getCreate_Time() {
return create_Time;
}
public void setCreate_Time(java.sql.Timestamp create_Time) {
this.create_Time = create_Time;
}
public void setCreate_Time(java.sql.Timestamp create_Time) {
this.create_Time = create_Time;
}
public java.sql.Timestamp getUpdate_Time() {
return update_Time;
}
public java.sql.Timestamp getUpdate_Time() {
return update_Time;
}
public void setUpdate_Time(java.sql.Timestamp update_Time) {
this.update_Time = update_Time;
}
public void setUpdate_Time(java.sql.Timestamp update_Time) {
this.update_Time = update_Time;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuChannelBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.util.ConstantUtil;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* @description:
......@@ -22,42 +26,102 @@ public abstract class AbstractTagConsumFilter implements BaseTagFilter {
DataSourceHive dataSourceHiveBrandLabel = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D);
DataSourceHive dataSourceHiveSalesLabel = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_SALES_LABEL_D);
protected static JavaRDD<TrdEcuSalesLabelBean> statisticsTypeHandle(JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD, AbstractFilterRequest consumeRequest){
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=labelRDD.mapPartitions(data->{
List<TrdEcuSalesLabelBean> result=new ArrayList();
while (data.hasNext()){
Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>> tp2=data.next();
TrdEcuSalesLabelBean consumeBean=tp2._1();
switch (consumeRequest.getStatisticsType()){
case COMMODITYBRAND:
if(tp2._2().isPresent()){
for(TrdEcuBrandLabelBean brandLabelBean:tp2._2().get()){
if(consumeRequest.getStatisticsValList().contains(String.valueOf(brandLabelBean.getEnt_brand_id()))){
result.add(brandLabelToSalesLabel(brandLabelBean));
}
}
}
break;
case CHANNEL:
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getStore_info_id()))){
result.add(consumeBean);
}
break;
case MCUINFO:
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getMbr_area_id()))){
result.add(consumeBean);
}
break;
default:break;
}
}
return result.iterator();
});
public static class ConsumeStatisticEntity {
Long ecuId;
List<TrdEcuBrandBean> brandBeanList;
List<TrdEcuChannelBean> channelBeanList;
}
protected static List<TrdEcuSalesBeanBase> filterSalesBean(ConsumeStatisticEntity entity, AbstractFilterRequest consumeRequest) {
List<TrdEcuSalesBeanBase> result = new ArrayList<>();
switch (consumeRequest.statisticsType) {
case MCUINFO:
for (TrdEcuChannelBean bean : entity.channelBeanList) {
if (consumeRequest.getStatisticsValList().contains(String.valueOf(bean.getMbr_area_id()))) {
result.add(bean);
}
}
break;
case CHANNEL:
for (TrdEcuChannelBean bean : entity.channelBeanList) {
if (consumeRequest.getStatisticsValList().contains(String.valueOf(bean.getChannel_code()))) {
result.add(bean);
}
}
break;
case COMMODITYBRAND:
for (TrdEcuBrandBean bean : entity.brandBeanList) {
if (consumeRequest.getStatisticsValList().contains(String.valueOf(bean.getEnt_brand_id()))) {
result.add(bean);
}
}
break;
default:
break;
}
return result;
}
protected static JavaRDD<TrdEcuSalesBeanBase> statisticsTypeHandle(JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD, AbstractFilterRequest consumeRequest) {
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = labelRDD.mapPartitions(data -> {
List<TrdEcuSalesBeanBase> result = new ArrayList();
while (data.hasNext()) {
Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>> tp2 = data.next();
TrdEcuSalesBeanBase consumeBean = tp2._1();
switch (consumeRequest.getStatisticsType()) {
case COMMODITYBRAND:
if (tp2._2().isPresent()) {
for (TrdEcuBrandBean brandLabelBean : tp2._2().get()) {
if (consumeRequest.getStatisticsValList().contains(String.valueOf(brandLabelBean.getEnt_brand_id()))) {
result.add(brandLabelBean);
}
}
}
break;
//TODO: 渠道重做
case CHANNEL:
if (consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getStore_info_id()))) {
result.add(consumeBean);
}
break;
case MCUINFO:
if (consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getMbr_area_id()))) {
result.add(consumeBean);
}
break;
default:
break;
}
}
return result.iterator();
});
return consumeRDD;
}
protected static TrdEcuSalesLabelBean brandLabelToSalesLabel(TrdEcuBrandLabelBean brandLabelBean){
TrdEcuSalesLabelBean salesLabelBean=new TrdEcuSalesLabelBean();
protected JavaRDD<ConsumeStatisticEntity> getConsumeEntity(Integer enterpriseId) {
JavaRDD<TrdEcuChannelBean> channelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuChannelBean.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<ConsumeStatisticEntity> rdd = channelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.groupByKey()
.leftOuterJoin(brandRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> {
Long ecuId = data._1();
Optional<Iterable<TrdEcuBrandBean>> optional = data._2()._2();
ConsumeStatisticEntity entity = new ConsumeStatisticEntity();
entity.ecuId = ecuId;
entity.channelBeanList = Lists.newArrayList(data._2()._1());
entity.brandBeanList = optional.isPresent() ? Lists.newArrayList(optional.get()) : new ArrayList<>();
return entity;
});
return rdd;
}
protected static TrdEcuSalesBeanBase brandLabelToSalesLabel(TrdEcuBrandBean brandLabelBean) {
TrdEcuSalesBeanBase salesLabelBean = new TrdEcuSalesBeanBase();
salesLabelBean.setEnt_id(brandLabelBean.getEnt_id());
salesLabelBean.setMbr_area_id(brandLabelBean.getMbr_area_id());
salesLabelBean.setEcu_id(brandLabelBean.getEcu_id());
......@@ -69,7 +133,43 @@ public abstract class AbstractTagConsumFilter implements BaseTagFilter {
salesLabelBean.setPay_amt(brandLabelBean.getPay_amt());
salesLabelBean.setTotal_amt(brandLabelBean.getTotal_amt());
salesLabelBean.setSleep_days(brandLabelBean.getSleep_days());
return salesLabelBean;
}
protected static void handleValueCompare(Set<String> groupIds, TagConsumeDoubleRequest consumeRequest, String tagGroupId, double cusSinglePiece) {
switch (consumeRequest.getNumberType()) {
case gt:
if (cusSinglePiece > consumeRequest.getBeginNum()) {
groupIds.add(tagGroupId);
}
break;
case gte:
if (cusSinglePiece >= consumeRequest.getBeginNum()) {
groupIds.add(tagGroupId);
}
break;
case lt:
if (cusSinglePiece < consumeRequest.getEndNum()) {
groupIds.add(tagGroupId);
}
break;
case lte:
if (cusSinglePiece <= consumeRequest.getEndNum()) {
groupIds.add(tagGroupId);
}
break;
case eq:
if (cusSinglePiece == consumeRequest.getEqualNum()) {
groupIds.add(tagGroupId);
}
break;
case between:
if (cusSinglePiece >= consumeRequest.getBeginNum()
&& cusSinglePiece <= consumeRequest.getEndNum()) {
groupIds.add(tagGroupId);
}
default:
break;
}
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdVirtualOrderBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.AbstractFilterRequestTime;
......@@ -8,6 +9,7 @@ import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import java.util.ArrayList;
......@@ -25,6 +27,13 @@ public abstract class AbstractTagConsumRecordFilter implements BaseTagFilter {
DataSourceHive dataSourceHiveOrder = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_WDORDER_D);
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
protected void getOrderRdds(int enterpriseId) {
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
JavaRDD<Row> virtualOrderItemRdd = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id", "ent_brand_id").javaRDD();
}
protected static JavaRDD<TrdVirtualOrderBean> statisticsTypeHandle(JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderRdd, AbstractFilterRequest request) {
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = orderRdd.mapPartitions(data -> {
List<TrdVirtualOrderBean> result = new ArrayList();
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.request.AbstractFilterRequest;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
......@@ -16,4 +18,6 @@ public interface BaseTagFilter {
List<DataSourceEntity> necessarySourceList();
JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request);
JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList);
}
......@@ -2,24 +2,18 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagIntegralBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagIntegralRequest;
import com.gic.spark.tag.TagConstant;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
......@@ -28,127 +22,147 @@ import java.util.Set;
*/
public class TagAboutExpireIntegralFilter implements BaseTagFilter {
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private DataSourceSharding changeLogDs = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private DataSourceSharding userRelationDs = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER_RELATION);
private static TagAboutExpireIntegralFilter instance;
public static TagAboutExpireIntegralFilter getInstance() {
if(null==instance){
instance=new TagAboutExpireIntegralFilter();
if (null == instance) {
instance = new TagAboutExpireIntegralFilter();
}
return instance;
}
private TagAboutExpireIntegralFilter(){}
private TagAboutExpireIntegralFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceSharding);
result.add(changeLogDs);
return result;
}
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("limit_time").isNotNull())
.filter(new Column("integral_change_type").notEqual(3))
.filter(new Column("integral_value").$greater(0));
JavaRDD<TagIntegralBean>integralRDD= MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD();
JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getDomainType()){
case ACU_INFO:
if(TagConstant.CU_TYPE_ACU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case SCU_INFO:
if(TagConstant.CU_TYPE_SCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case MCU_INFO:
if(TagConstant.CU_TYPE_MCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
default:break;
}
}
return result.iterator();
}).mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getTimeRangeType()){
case LATELY:
if(integralBean.getLimit_time().getTime()>
DateUtil.addNumForDay(DateUtil.getDate(),-integralRequest.getTimeNum()).getTime()){
result.add(integralBean);
}
break;
case FIXATION:
if(integralBean.getLimit_time().getTime()>=integralRequest.getBeginTime().getTime()
&&integralBean.getCreate_Time().getTime()<=integralRequest.getEndTime().getTime()){
result.add(integralBean);
}
break;
//TODO: 重做
default:break;
}
}
return result.iterator();
}).mapToPair(data-> Tuple2.apply(data.getFile_ecu_id(),data.getIntegral_Value()))
.reduceByKey((x,y)->x+y)
.mapPartitions(data->{
Set<Long> result=new HashSet();
while (data.hasNext()){
Tuple2<Long,Integer> tp2=data.next();
int integralValue=tp2._2();
switch (integralRequest.getNumberType()){
case eq:
if(integralRequest.getEqualNum()==integralValue){
result.add(tp2._1());
}
break;
case gt:
if(integralValue>integralRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(integralValue>=integralRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(integralValue<integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(integralValue<=integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
case between:
if(integralValue>=integralRequest.getBeginNum()
&&integralValue<=integralRequest.getEndNum()){
result.add(tp2._1());
}
break;
default:break;
}
}
return result.iterator();
});
// TagIntegralRequest integralRequest = (TagIntegralRequest) request;
// Dataset<Row> dataset = changeLogDs.getDatasetByEnterpriseId(enterpriseId)
// .filter(new Column("limit_time").isNotNull())
// .filter(new Column("integral_change_type").notEqual(3))
// .filter(new Column("integral_value").$greater(0));
// JavaRDD<TagIntegralBean> integralRDD = MysqlRddManager.getPojoFromDataset(dataset, TagIntegralBean.class).javaRDD();
//
// JavaRDD<Long> ecuRdd = integralRDD.mapPartitions(data -> {
// List<TagIntegralBean> result = new ArrayList();
// while (data.hasNext()) {
// TagIntegralBean integralBean = data.next();
// switch (integralRequest.getDomainType()) {
// case ACU_INFO:
// if (TagConstant.CU_TYPE_ACU == integralBean.getCu_Type()
// && integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
// result.add(integralBean);
// }
// break;
// case SCU_INFO:
// if (TagConstant.CU_TYPE_SCU == integralBean.getCu_Type()
// && integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
// result.add(integralBean);
// }
// break;
// case MCU_INFO:
// if (TagConstant.CU_TYPE_MCU == integralBean.getCu_Type()
// && integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
// result.add(integralBean);
// }
// break;
// default:
// break;
// }
// }
// return result.iterator();
// }).mapPartitions(data -> {
// List<TagIntegralBean> result = new ArrayList();
// while (data.hasNext()) {
// TagIntegralBean integralBean = data.next();
// switch (integralRequest.getTimeRangeType()) {
// case LATELY:
// if (integralBean.getLimit_time().getTime() >
// DateUtil.addNumForDay(DateUtil.getDate(), -integralRequest.getTimeNum()).getTime()) {
// result.add(integralBean);
// }
// break;
// case FIXATION:
// if (integralBean.getLimit_time().getTime() >= integralRequest.getBeginTime().getTime()
// && integralBean.getCreate_Time().getTime() <= integralRequest.getEndTime().getTime()) {
// result.add(integralBean);
// }
// break;
//
// default:
// break;
// }
// }
// return result.iterator();
// }).mapToPair(data -> Tuple2.apply(data.getFile_ecu_id(), data.getIntegral_Value()))
// .reduceByKey((x, y) -> x + y)
// .mapPartitions(data -> {
// Set<Long> result = new HashSet();
// while (data.hasNext()) {
// Tuple2<Long, Integer> tp2 = data.next();
// int integralValue = tp2._2();
// switch (integralRequest.getNumberType()) {
// case eq:
// if (integralRequest.getEqualNum() == integralValue) {
// result.add(tp2._1());
// }
// break;
// case gt:
// if (integralValue > integralRequest.getBeginNum()) {
// result.add(tp2._1());
// }
// break;
// case gte:
// if (integralValue >= integralRequest.getBeginNum()) {
// result.add(tp2._1());
// }
// break;
// case lt:
// if (integralValue < integralRequest.getEndNum()) {
// result.add(tp2._1());
// }
// break;
// case lte:
// if (integralValue <= integralRequest.getEndNum()) {
// result.add(tp2._1());
// }
// break;
// case between:
// if (integralValue >= integralRequest.getBeginNum()
// && integralValue <= integralRequest.getEndNum()) {
// result.add(tp2._1());
// }
// break;
// default:
// break;
// }
// }
// return result.iterator();
// });
//
// return ecuRdd;
return null;
}
return ecuRdd;
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
Dataset<Row> dataset = changeLogDs.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("limit_time").isNotNull())
.filter(new Column("integral_change_type").notEqual(3))
.filter(new Column("integral_value").$greater(0));
return null;
}
}
......@@ -3,6 +3,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TagIntegralBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagIntegralRequest;
......@@ -10,6 +11,7 @@ import com.gic.spark.tag.TagConstant;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
......@@ -28,16 +30,18 @@ import java.util.Set;
*/
public class TagAccumulatedIntegralFilter implements BaseTagFilter {
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private DataSourceSharding dataSourceSharding = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private static TagAccumulatedIntegralFilter instance;
public static TagAccumulatedIntegralFilter getInstance() {
if(null==instance){
instance=new TagAccumulatedIntegralFilter();
}
if (null == instance) {
instance = new TagAccumulatedIntegralFilter();
}
return instance;
}
private TagAccumulatedIntegralFilter(){}
private TagAccumulatedIntegralFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
......@@ -48,107 +52,117 @@ public class TagAccumulatedIntegralFilter implements BaseTagFilter {
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull());
JavaRDD<TagIntegralBean>integralRDD=MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD();
TagIntegralRequest integralRequest = (TagIntegralRequest) request;
Dataset<Row> dataset = dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull());
JavaRDD<TagIntegralBean> integralRDD = MysqlRddManager.getPojoFromDataset(dataset, TagIntegralBean.class).javaRDD();
JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getDomainType()){
case ACU_INFO:
if(TagConstant.CU_TYPE_ACU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case SCU_INFO:
if(TagConstant.CU_TYPE_SCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
case MCU_INFO:
if(TagConstant.CU_TYPE_MCU==integralBean.getCu_Type()
&&integralRequest.getCuVals().contains(integralBean.getCu_Id())){
result.add(integralBean);
}
break;
default:break;
}
}
return result.iterator();
}).mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
switch (integralRequest.getTimeRangeType()){
case LATELY:
if(integralBean.getCreate_Time().getTime()>
DateUtil.addNumForDay(DateUtil.getDate(),-integralRequest.getTimeNum()).getTime()){
result.add(integralBean);
}
break;
case FIXATION:
if(integralBean.getCreate_Time().getTime()>=integralRequest.getBeginTime().getTime()
&&integralBean.getCreate_Time().getTime()<=integralRequest.getEndTime().getTime()){
result.add(integralBean);
}
break;
JavaRDD<Long> ecuRdd = integralRDD.mapPartitions(data -> {
List<TagIntegralBean> result = new ArrayList();
while (data.hasNext()) {
TagIntegralBean integralBean = data.next();
switch (integralRequest.getDomainType()) {
case ACU_INFO:
if (TagConstant.CU_TYPE_ACU == integralBean.getCu_Type()
&& integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
result.add(integralBean);
}
break;
case SCU_INFO:
if (TagConstant.CU_TYPE_SCU == integralBean.getCu_Type()
&& integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
result.add(integralBean);
}
break;
case MCU_INFO:
if (TagConstant.CU_TYPE_MCU == integralBean.getCu_Type()
&& integralRequest.getCuVals().contains(integralBean.getCu_Id())) {
result.add(integralBean);
}
break;
default:
break;
}
}
return result.iterator();
}).mapPartitions(data -> {
List<TagIntegralBean> result = new ArrayList();
while (data.hasNext()) {
TagIntegralBean integralBean = data.next();
switch (integralRequest.getTimeRangeType()) {
case LATELY:
if (integralBean.getCreate_Time().getTime() >
DateUtil.addNumForDay(DateUtil.getDate(), -integralRequest.getTimeNum()).getTime()) {
result.add(integralBean);
}
break;
case FIXATION:
if (integralBean.getCreate_Time().getTime() >= integralRequest.getBeginTime().getTime()
&& integralBean.getCreate_Time().getTime() <= integralRequest.getEndTime().getTime()) {
result.add(integralBean);
}
break;
default:break;
}
}
return result.iterator();
}).mapToPair(data-> Tuple2.apply(data.getCu_Id()+"@"+data.getFile_ecu_id(),data)).groupByKey()
.mapPartitions(data->{
Set<Long>result=new HashSet();
while (data.hasNext()){
Tuple2<String,Iterable<TagIntegralBean>> tp2=data.next();
int integralValue=0;
for(TagIntegralBean integralBean:tp2._2()){
integralValue+=integralBean.getIntegral_Value()>0?integralBean.getIntegral_Value():0;
}
switch (integralRequest.getNumberType()){
case eq:
if(integralRequest.getEqualNum()==integralValue){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gt:
if(integralValue>integralRequest.getBeginNum()){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gte:
if(integralValue>=integralRequest.getBeginNum()){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lt:
if(integralValue<integralRequest.getEndNum()){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lte:
if(integralValue<=integralRequest.getEndNum()){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case between:
if(integralValue>=integralRequest.getBeginNum()
&&integralValue<=integralRequest.getEndNum()){
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
default:break;
}
}
return result.iterator();
});
default:
break;
}
}
return result.iterator();
}).mapToPair(data -> Tuple2.apply(data.getCu_Id() + "@" + data.getFile_ecu_id(), data)).groupByKey()
.mapPartitions(data -> {
Set<Long> result = new HashSet();
while (data.hasNext()) {
Tuple2<String, Iterable<TagIntegralBean>> tp2 = data.next();
int integralValue = 0;
for (TagIntegralBean integralBean : tp2._2()) {
integralValue += integralBean.getIntegral_Value() > 0 ? integralBean.getIntegral_Value() : 0;
}
switch (integralRequest.getNumberType()) {
case eq:
if (integralRequest.getEqualNum() == integralValue) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gt:
if (integralValue > integralRequest.getBeginNum()) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gte:
if (integralValue >= integralRequest.getBeginNum()) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lt:
if (integralValue < integralRequest.getEndNum()) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lte:
if (integralValue <= integralRequest.getEndNum()) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case between:
if (integralValue >= integralRequest.getBeginNum()
&& integralValue <= integralRequest.getEndNum()) {
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
default:
break;
}
}
return result.iterator();
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
//逻辑梳理
return null;
}
}
......@@ -2,35 +2,42 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
* 连带率
* @description: 连带率
* @author: wangxk
* @date: 2020/5/7
*/
public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter {
private static TagAssociatedPurchaseRateFilter instance;
public static TagAssociatedPurchaseRateFilter getInstance() {
if(null==instance){
instance=new TagAssociatedPurchaseRateFilter();
if (null == instance) {
instance = new TagAssociatedPurchaseRateFilter();
}
return instance;
}
private TagAssociatedPurchaseRateFilter(){}
private TagAssociatedPurchaseRateFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
......@@ -41,64 +48,100 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeDoubleRequest consumeRequest=(TagConsumeDoubleRequest)request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setSeff_goods_num(x.getSeff_goods_num()+y.getSeff_goods_num());
x.setSeff_order_cnt(x.getSeff_order_cnt()+y.getSeff_order_cnt());
return x;
})
.mapPartitions(data->{
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double jointRate= CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()){
case gt:
if(jointRate>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(jointRate>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(jointRate<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(jointRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(jointRate==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(jointRate>=consumeRequest.getBeginNum()
&&jointRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) request;
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.reduceByKey((x, y) -> {
x.setSeff_goods_num(x.getSeff_goods_num() + y.getSeff_goods_num());
x.setSeff_order_cnt(x.getSeff_order_cnt() + y.getSeff_order_cnt());
return x;
})
.mapPartitions(data -> {
List<Long> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, TrdEcuSalesBeanBase> tp2 = data.next();
double jointRate = CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num()) / CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()) {
case gt:
if (jointRate > consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case gte:
if (jointRate >= consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case lt:
if (jointRate < consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case lte:
if (jointRate <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case eq:
if (jointRate == consumeRequest.getEqualNum()) {
result.add(tp2._1());
}
break;
case between:
if (jointRate >= consumeRequest.getBeginNum()
&& jointRate <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
default:
break;
}
}
return result.iterator();
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> consumeEntity = getConsumeEntity(enterpriseId);
JavaPairRDD<Long, String> resultRdd = consumeEntity.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
//过滤数据
List<TrdEcuSalesBeanBase> salseBeanList = filterSalesBean(data, consumeRequest);
if (salseBeanList.size() == 0) {
continue;
}
int goodsNum = 0;
int orderCount = 0;
for (TrdEcuSalesBeanBase beanBase : salseBeanList) {
goodsNum += beanBase.getSeff_goods_num();
orderCount += beanBase.getSeff_order_cnt();
}
double jointRate = goodsNum / (double) orderCount;
String tagGroupId = entity.getTagGroupId();
handleValueCompare(groupIds, consumeRequest, tagGroupId, jointRate);
}
return Tuple2.apply(data.ecuId, groupIds.size() > 0 ? Joiner.on(" ").join(groupIds) : null);
}).filter(data -> data._2() != null);
return resultRdd;
}
}
......@@ -2,17 +2,22 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.util.CommonUtil;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description: 平均折扣率
......@@ -45,14 +50,14 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean, Optional<Iterable<TrdEcuBrandLabelBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
......@@ -65,7 +70,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
.mapPartitions(data -> {
List<Long> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, TrdEcuSalesLabelBean> tp2 = data.next();
Tuple2<Long, TrdEcuSalesBeanBase> tp2 = data.next();
double avgDiscountRate = 1 == configStatus ? CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt()) / CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt())
: CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt()) / CommonUtil.isEmptyDouble2double(tp2._2().getTotal_amt());
switch (consumeRequest.getNumberType()) {
......@@ -107,4 +112,39 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> entityJavaRDD = getConsumeEntity(enterpriseId);
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaPairRDD<Long, String> result = entityJavaRDD.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
List<TrdEcuSalesBeanBase> salesBeanList = filterSalesBean(data, entity.getRequest());
if (salesBeanList.size() == 0) {
continue;
}
double receiveAmount = 0;
double payAmount = 0;
double totalAmount = 0;
for (TrdEcuSalesBeanBase beanBase : salesBeanList) {
receiveAmount += beanBase.getReceive_amt();
payAmount += beanBase.getPay_amt();
totalAmount += beanBase.getTotal_amt();
}
double avgDiscountRate = 1 == configStatus ? payAmount / totalAmount
: receiveAmount / totalAmount;
String tagGroupId = entity.getTagGroupId();
handleValueCompare(groupIds, consumeRequest, tagGroupId, avgDiscountRate);
}
return Tuple2.apply(data.ecuId, groupIds.size() == 0 ? null : Joiner.on(" ").join(groupIds));
}).filter(data -> data._2() != null);
return result;
}
}
......@@ -2,6 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdVirtualOrderBean;
import com.gic.spark.entity.bean.TrdVirtualOrderItemBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
......@@ -48,10 +49,10 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter {
TagConsumeCommodityRequest commodityRequest = (TagConsumeCommodityRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
Dataset<Row> OrderItemDS = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
Dataset<Row> orderItemDS = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderAndItemRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id", "ent_brand_id").javaRDD()
.leftOuterJoin(orderItemDS.select("virtual_order_id", "ent_brand_id").javaRDD()
.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1)))
.groupByKey())
.map(data -> data._2());
......@@ -61,7 +62,7 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter {
.filter(data -> checkTime(commodityRequest, DateUtil.strToDate(data.getReceipts_time(), DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data.getEcu_id()));
JavaPairRDD<Long, Long> orderItemRDD = MysqlRddManager.getPojoFromDataset(OrderItemDS, TrdVirtualOrderItemBean.class).javaRDD()
JavaPairRDD<Long, Long> orderItemRDD = MysqlRddManager.getPojoFromDataset(orderItemDS, TrdVirtualOrderItemBean.class).javaRDD()
.filter(data -> {
if (StringUtils.isNotEmpty(data.getSku_code())
&& commodityRequest.getSkuCodeList().contains(data.getSku_code())) {
......@@ -78,4 +79,10 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter {
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
return null;
}
}
......@@ -7,7 +7,6 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeTimeRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column;
......@@ -22,18 +21,19 @@ import java.util.List;
* @author: wangxk
* @date: 2020/8/10
*/
public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter {
private static TagConsumeTimeFilter instance;
public static TagConsumeTimeFilter getInstance() {
if(null==instance){
instance=new TagConsumeTimeFilter();
if (null == instance) {
instance = new TagConsumeTimeFilter();
}
return instance;
}
private TagConsumeTimeFilter(){}
private TagConsumeTimeFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
......@@ -45,22 +45,22 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
TagConsumeTimeRequest consumeTimeRequest = (TagConsumeTimeRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Row> virtualOrderItemRdd = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id", "ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1))).groupByKey())
.map(data -> data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeTimeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19)))
.filter(data->checkTime(consumeTimeRequest,data._2().getTime()))
.reduceByKey((x,y)->x)
.map(data->data._1());
consumeRecordRDD = statisticsTypeHandle(orderRdd, consumeTimeRequest);
JavaRDD<Long> ecuRdd = consumeRecordRDD.filter(data -> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data -> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(), DateUtil.FORMAT_DATETIME_19)))
.filter(data -> checkTime(consumeTimeRequest, data._2().getTime()))
.reduceByKey((x, y) -> x)
.map(data -> data._1());
return ecuRdd;
}
}
......@@ -25,13 +25,16 @@ import java.util.List;
public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
private static TagConsumeTotalFilter instance;
public static TagConsumeTotalFilter getInstance(){
if(null==instance){
instance=new TagConsumeTotalFilter();
public static TagConsumeTotalFilter getInstance() {
if (null == instance) {
instance = new TagConsumeTotalFilter();
}
return instance;
}
private TagConsumeTotalFilter(){}
private TagConsumeTotalFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
......@@ -43,77 +46,79 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
TagConsumeAmountRequest consumeAmountRequest = (TagConsumeAmountRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Row> virtualOrderItemRdd = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id", "ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1))).groupByKey())
.map(data -> data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
consumeRecordRDD = statisticsTypeHandle(orderRdd, consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
boolean result=false;
if(StringUtils.isNotEmpty(data.getReceipts_time())){
Date receiptsTime=DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATE_10);
switch (consumeAmountRequest.getTimeRangeType()){
case FIXATION:
if(receiptsTime.getTime()>=consumeAmountRequest.getBeginTime().getTime()
&&receiptsTime.getTime()<=consumeAmountRequest.getEndTime().getTime())
result=true;
break;
case LATELY:
if(receiptsTime.getTime()>DateUtil.addNumForDay(new Date(),-consumeAmountRequest.getTimeNum()).getTime()){
result=true;
}
break;
default:break;
}
}
return result;
}).mapToPair(data-> Tuple2.apply(data.getEcu_id(),configStatus==1?data.getPaid_amt():data.getPay_amt()))
.reduceByKey((x,y)->x+y)
.filter(data->{
boolean result=false;
switch (consumeAmountRequest.getNumberType()){
case between:
if(data._2()>=consumeAmountRequest.getBeginNum()
&&data._2()<=consumeAmountRequest.getEndNum()){
result=true;
}
break;
case lt:
if(data._2()<consumeAmountRequest.getEndNum()){
result=true;
}
break;
case gt:
if(data._2()>consumeAmountRequest.getBeginNum()){
result=true;
}
break;
case eq:
if(data._2()==consumeAmountRequest.getEqualNum()){
result=true;
}
break;
case lte:
if(data._2()<=consumeAmountRequest.getEndNum()){
result=true;
}
break;
case gte:
if(data._2()>=consumeAmountRequest.getBeginNum()){
result=true;
}
break;
default:break;
}
return result;
}).map(data->data._1());
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long> ecuRdd = consumeRecordRDD.filter(data -> {
boolean result = false;
if (StringUtils.isNotEmpty(data.getReceipts_time())) {
Date receiptsTime = DateUtil.strToDate(data.getReceipts_time(), DateUtil.FORMAT_DATE_10);
switch (consumeAmountRequest.getTimeRangeType()) {
case FIXATION:
if (receiptsTime.getTime() >= consumeAmountRequest.getBeginTime().getTime()
&& receiptsTime.getTime() <= consumeAmountRequest.getEndTime().getTime())
result = true;
break;
case LATELY:
if (receiptsTime.getTime() > DateUtil.addNumForDay(new Date(), -consumeAmountRequest.getTimeNum()).getTime()) {
result = true;
}
break;
default:
break;
}
}
return result;
}).mapToPair(data -> Tuple2.apply(data.getEcu_id(), configStatus == 1 ? data.getPaid_amt() : data.getPay_amt()))
.reduceByKey((x, y) -> x + y)
.filter(data -> {
boolean result = false;
switch (consumeAmountRequest.getNumberType()) {
case between:
if (data._2() >= consumeAmountRequest.getBeginNum()
&& data._2() <= consumeAmountRequest.getEndNum()) {
result = true;
}
break;
case lt:
if (data._2() < consumeAmountRequest.getEndNum()) {
result = true;
}
break;
case gt:
if (data._2() > consumeAmountRequest.getBeginNum()) {
result = true;
}
break;
case eq:
if (data._2() == consumeAmountRequest.getEqualNum()) {
result = true;
}
break;
case lte:
if (data._2() <= consumeAmountRequest.getEndNum()) {
result = true;
}
break;
case gte:
if (data._2() >= consumeAmountRequest.getBeginNum()) {
result = true;
}
break;
default:
break;
}
return result;
}).map(data -> data._1());
return ecuRdd;
}
......
......@@ -2,30 +2,35 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
* 消费休眠天数
* @description: 消费休眠天数
* @author: wangxk
* @date: 2020/5/8
*/
public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter {
private static TagConsumptionSleepDaysFilter instance;
public static TagConsumptionSleepDaysFilter getInstance() {
if(null==instance){
instance=new TagConsumptionSleepDaysFilter();
if (null == instance) {
instance = new TagConsumptionSleepDaysFilter();
}
return instance;
}
......@@ -40,58 +45,87 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.mapPartitionsToPair(data->{
List<Tuple2<Long,Integer>>result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2);
}
break;
case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2);
}
break;
case lt:
if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2);
}
break;
case lte:
if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2);
}
break;
case eq:
if(tp2._2()==consumeRequest.getEqualNum()){
result.add(tp2);
}
break;
case between:
if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2);
}
default:break;
}
}
return result.iterator();
})
.reduceByKey((x,y)->x)
.map(data->data._1());
TagConsumeRequest consumeRequest = (TagConsumeRequest) request;
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data.getSleep_days()))
.mapPartitionsToPair(data -> {
List<Tuple2<Long, Integer>> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, Integer> tp2 = data.next();
switch (consumeRequest.getNumberType()) {
case gt:
if (tp2._2() > consumeRequest.getBeginNum()) {
result.add(tp2);
}
break;
case gte:
if (tp2._2() >= consumeRequest.getBeginNum()) {
result.add(tp2);
}
break;
case lt:
if (tp2._2() < consumeRequest.getEndNum()) {
result.add(tp2);
}
break;
case lte:
if (tp2._2() <= consumeRequest.getEndNum()) {
result.add(tp2);
}
break;
case eq:
if (tp2._2() == consumeRequest.getEqualNum()) {
result.add(tp2);
}
break;
case between:
if (tp2._2() >= consumeRequest.getBeginNum()
&& tp2._2() <= consumeRequest.getEndNum()) {
result.add(tp2);
}
default:
break;
}
}
return result.iterator();
})
.reduceByKey((x, y) -> x)
.map(Tuple2::_1);
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> entityJavaRDD = getConsumeEntity(enterpriseId);
JavaPairRDD<Long, String> rdd = entityJavaRDD.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
List<TrdEcuSalesBeanBase> salseBeanList = filterSalesBean(data, entity.getRequest());
if (salseBeanList.size() == 0) {
continue;
}
String tagGroupId = entity.getTagGroupId();
for (TrdEcuSalesBeanBase beanBase : salseBeanList) {
Integer sleepDays = beanBase.getSleep_days();
handleValueCompare(groupIds, consumeRequest, tagGroupId, sleepDays);
}
}
return Tuple2.apply(data.ecuId, groupIds.size() == 0 ? null : Joiner.on(" ").join(groupIds));
}).filter(data -> data._2() != null);
return rdd;
}
}
......@@ -2,32 +2,41 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
*消费次数
* @description: 消费次数
* @author: wangxk
* @date: 2020/5/6
*/
public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
public class TagConsumptionTimeFilter extends AbstractTagConsumFilter {
private static TagConsumptionTimeFilter instance;
public static TagConsumptionTimeFilter getInstance(){
if(null==instance){
instance=new TagConsumptionTimeFilter();
public static TagConsumptionTimeFilter getInstance() {
if (null == instance) {
instance = new TagConsumptionTimeFilter();
}
return instance;
}
private TagConsumptionTimeFilter(){}
private TagConsumptionTimeFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
......@@ -38,56 +47,83 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
TagConsumeRequest consumeRequest = (TagConsumeRequest) request;
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(tp2._2().intValue()==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data.getOrder_times())).reduceByKey(Integer::sum)
.mapPartitions(data -> {
List<Long> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, Integer> tp2 = data.next();
switch (consumeRequest.getNumberType()) {
case gt:
if (tp2._2() > consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case gte:
if (tp2._2() >= consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case lt:
if (tp2._2() < consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case lte:
if (tp2._2() <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case eq:
if (tp2._2().intValue() == consumeRequest.getEqualNum()) {
result.add(tp2._1());
}
break;
case between:
if (tp2._2() >= consumeRequest.getBeginNum()
&& tp2._2() <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
default:
break;
}
}
return result.iterator();
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> entityJavaRDD = getConsumeEntity(enterpriseId);
return entityJavaRDD.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
List<TrdEcuSalesBeanBase> salseBeanList = filterSalesBean(data, entity.getRequest());
if (salseBeanList.size() == 0) {
continue;
}
String tagGroupId = entity.getTagGroupId();
int totalTime = 0;
for (TrdEcuSalesBeanBase beanBase : salseBeanList) {
totalTime += beanBase.getOrder_times();
}
handleValueCompare(groupIds, consumeRequest, tagGroupId, totalTime);
}
return Tuple2.apply(data.ecuId, groupIds.size() == 0 ? null : Joiner.on(" ").join(groupIds));
}).filter(data -> data._2() != null);
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdVirtualOrderBean;
import com.gic.spark.entity.bean.TrdVirtualOrderItemBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeCommodityRequest;
import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
......@@ -25,14 +23,14 @@ import java.util.List;
* @author: wangxk
* @date: 2020/8/12
*/
public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilter {
private static TagFirstConsumeCommodityFilter instance;
public static TagFirstConsumeCommodityFilter getInstance() {
if(null==instance){
instance=new TagFirstConsumeCommodityFilter();
if (null == instance) {
instance = new TagFirstConsumeCommodityFilter();
}
return instance;
}
......@@ -47,45 +45,47 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
Dataset<Row> OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
<DateUtil.strToDate(y.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
})
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
return true;
}
return false;
}).mapToPair(data->Tuple2.apply(data.getVirtual_order_id(),data.getVirtual_order_id()))
.reduceByKey((x,y)->x);
JavaRDD<Long>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
.filter(data->data._2()._2().isPresent())
.map(data->data._2()._1()).distinct();
return ecuRdd;
TagConsumeCommodityRequest commodityRequest = (TagConsumeCommodityRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
Dataset<Row> orderItemDS = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderAndItemRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(orderItemDS.select("virtual_order_id", "ent_brand_id").javaRDD()
.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1)))
.groupByKey())
.map(data -> data._2());
consumeRecordRDD = statisticsTypeHandle(orderAndItemRdd, commodityRequest);
JavaPairRDD<Long, Long> orderRdd = consumeRecordRDD.filter(data -> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.reduceByKey((x, y) -> {
if (DateUtil.strToDate(x.getReceipts_time(), DateUtil.FORMAT_DATETIME_19).getTime()
< DateUtil.strToDate(y.getReceipts_time(), DateUtil.FORMAT_DATETIME_19).getTime()) {
return x;
} else {
return y;
}
})
.mapToPair(data -> Tuple2.apply(data._2().getVirtual_id(), data._1()));
JavaPairRDD<Long, Long> orderItemRDD = MysqlRddManager.getPojoFromDataset(orderItemDS, TrdVirtualOrderItemBean.class).javaRDD()
.filter(data -> {
if (StringUtils.isNotEmpty(data.getSku_code())
&& commodityRequest.getSkuCodeList().contains(data.getSku_code())) {
return true;
}
return false;
}).mapToPair(data -> Tuple2.apply(data.getVirtual_order_id(), data.getVirtual_order_id()))
.reduceByKey((x, y) -> x);
JavaRDD<Long> ecuRdd = orderRdd.leftOuterJoin(orderItemRDD)
.filter(data -> data._2()._2().isPresent())
.map(data -> data._2()._1()).distinct();
return ecuRdd;
}
}
......@@ -5,7 +5,6 @@ import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdVirtualOrderBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
......@@ -19,18 +18,19 @@ import java.util.List;
* @author: wangxk
* @date: 2020/8/11
*/
public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter {
private static TagHistoryOfflineConsumptionStoreFilter instance;
public static TagHistoryOfflineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagHistoryOfflineConsumptionStoreFilter();
if (null == instance) {
instance = new TagHistoryOfflineConsumptionStoreFilter();
}
return instance;
}
private TagHistoryOfflineConsumptionStoreFilter(){}
private TagHistoryOfflineConsumptionStoreFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
......@@ -42,22 +42,22 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
TagConsumeStoreRequest storeRequest = (TagConsumeStoreRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").toJavaRDD();
JavaRDD<Row> virtualOrderItemRdd = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id", "ent_brand_id").toJavaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1))).groupByKey())
.map(data -> data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& null!=data.getStore_info_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
.filter(data->storeRequest.getStoreList().contains(String.valueOf(data._2())))
.reduceByKey((x,y)->x)
.map(data->data._1());
consumeRecordRDD = statisticsTypeHandle(orderRdd, storeRequest);
JavaRDD<Long> ecuRdd = consumeRecordRDD.filter(data -> data.getOrder_channel_code() == 1
&& null != data.getStore_info_id())
.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data.getStore_info_id()))
.filter(data -> storeRequest.getStoreList().contains(String.valueOf(data._2())))
.reduceByKey((x, y) -> x)
.map(data -> data._1());
return ecuRdd;
}
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdVirtualOrderBean;
import com.gic.spark.entity.bean.TrdVirtualOrderItemBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeCommodityRequest;
import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
......@@ -25,14 +23,14 @@ import java.util.List;
* @author: wangxk
* @date: 2020/8/12
*/
public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilter {
private static TagLatelyConsumeCommodityFilter instance;
public static TagLatelyConsumeCommodityFilter getInstance() {
if(null==instance){
instance=new TagLatelyConsumeCommodityFilter();
if (null == instance) {
instance = new TagLatelyConsumeCommodityFilter();
}
return instance;
}
......@@ -47,45 +45,45 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
TagConsumeCommodityRequest commodityRequest = (TagConsumeCommodityRequest) request;
JavaRDD<TrdVirtualOrderBean> consumeRecordRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
Dataset<Row> OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
Dataset<Row> OrderItemDS = dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>> orderAndItemRdd = consumeRecordRDD.mapToPair(data -> Tuple2.apply(data.getVirtual_id(), data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id", "ent_brand_id").javaRDD()
.mapToPair(row -> Tuple2.apply(row.getLong(0), row.getString(1)))
.groupByKey())
.map(data -> data._2());
consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
>DateUtil.strToDate(y.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
})
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
consumeRecordRDD = statisticsTypeHandle(orderAndItemRdd, commodityRequest);
JavaPairRDD<Long, Long> orderRdd = consumeRecordRDD.filter(data -> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.reduceByKey((x, y) -> {
if (DateUtil.strToDate(x.getReceipts_time(), DateUtil.FORMAT_DATETIME_19).getTime()
> DateUtil.strToDate(y.getReceipts_time(), DateUtil.FORMAT_DATETIME_19).getTime()) {
return x;
} else {
return y;
}
})
.mapToPair(data -> Tuple2.apply(data._2().getVirtual_id(), data._1()));
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
return true;
}
return false;
}).mapToPair(data->Tuple2.apply(data.getVirtual_order_id(),data.getVirtual_order_id()))
.reduceByKey((x,y)->x);
JavaPairRDD<Long, Long> orderItemRDD = MysqlRddManager.getPojoFromDataset(OrderItemDS, TrdVirtualOrderItemBean.class).javaRDD()
.filter(data -> {
if (StringUtils.isNotEmpty(data.getSku_code())
&& commodityRequest.getSkuCodeList().contains(data.getSku_code())) {
return true;
}
return false;
}).mapToPair(data -> Tuple2.apply(data.getVirtual_order_id(), data.getVirtual_order_id()))
.reduceByKey((x, y) -> x);
JavaRDD<Long>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
.filter(data->data._2()._2().isPresent())
.map(data->data._2()._1()).distinct();
JavaRDD<Long> ecuRdd = orderRdd.leftOuterJoin(orderItemRDD)
.filter(data -> data._2()._2().isPresent())
.map(data -> data._2()._1()).distinct();
return ecuRdd;
return ecuRdd;
}
}
......@@ -2,34 +2,43 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
* 客单价
* @description: 客单价
* @author: wangxk
* @date: 2020/5/7
*/
public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter {
private static TagPerCustomerTransactionFilter instance;
public static TagPerCustomerTransactionFilter getInstance(){
if(null==instance){
instance=new TagPerCustomerTransactionFilter();
public static TagPerCustomerTransactionFilter getInstance() {
if (null == instance) {
instance = new TagPerCustomerTransactionFilter();
}
return instance;
}
private TagPerCustomerTransactionFilter(){}
private TagPerCustomerTransactionFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
......@@ -40,64 +49,101 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setReceive_amt(x.getReceive_amt()+y.getReceive_amt());
x.setPay_amt(x.getPay_amt()+y.getPay_amt());
x.setOrder_times(x.getOrder_times()+y.getOrder_times());
return x;
})
.mapPartitions(data->{
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt())
:CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()){
case gt:
if(CusSinglePiece>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(CusSinglePiece>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(CusSinglePiece<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(CusSinglePiece<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(CusSinglePiece==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(CusSinglePiece>=consumeRequest.getBeginNum()
&&CusSinglePiece<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
TagConsumeRequest consumeRequest = (TagConsumeRequest) request;
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.reduceByKey((x, y) -> {
x.setReceive_amt(x.getReceive_amt() + y.getReceive_amt());
x.setPay_amt(x.getPay_amt() + y.getPay_amt());
x.setOrder_times(x.getOrder_times() + y.getOrder_times());
return x;
})
.mapPartitions(data -> {
List<Long> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, TrdEcuSalesBeanBase> tp2 = data.next();
double CusSinglePiece = 1 == configStatus ? CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt()) / CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt())
: CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt()) / CommonUtil.isEmptyInteger2int(tp2._2().getSeff_order_cnt());
switch (consumeRequest.getNumberType()) {
case gt:
if (CusSinglePiece > consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case gte:
if (CusSinglePiece >= consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case lt:
if (CusSinglePiece < consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case lte:
if (CusSinglePiece <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case eq:
if (CusSinglePiece == consumeRequest.getEqualNum()) {
result.add(tp2._1());
}
break;
case between:
if (CusSinglePiece >= consumeRequest.getBeginNum()
&& CusSinglePiece <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
default:
break;
}
}
return result.iterator();
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> entityJavaRDD = getConsumeEntity(enterpriseId);
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
return entityJavaRDD.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
List<TrdEcuSalesBeanBase> salseBeanList = filterSalesBean(data, entity.getRequest());
if (salseBeanList.size() == 0) {
continue;
}
String tagGroupId = entity.getTagGroupId();
double receiveAmount = 0;
double payAmount = 0;
int orderTimes = 0;
for (TrdEcuSalesBeanBase beanBase : salseBeanList) {
receiveAmount += beanBase.getReceive_amt();
payAmount += beanBase.getPay_amt();
orderTimes += beanBase.getOrder_times();
}
double cusSinglePiece = 1 == configStatus ? payAmount / orderTimes
: receiveAmount / orderTimes;
handleValueCompare(groupIds, consumeRequest, tagGroupId, cusSinglePiece);
}
return Tuple2.apply(data.ecuId, groupIds.size() == 0 ? null : Joiner.on(" ").join(groupIds));
}).filter(data -> data._2() != null);
}
}
......@@ -2,35 +2,43 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.FilterProcessEntity;
import com.gic.spark.entity.bean.TrdEcuBrandBean;
import com.gic.spark.entity.bean.TrdEcuSalesBeanBase;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeDoubleRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.CommonUtil;
import com.google.common.base.Joiner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
* @author: wangxk
* @date: 2020/8/26
*/
public class TagUnitPriceFilter extends AbstractTagConsumFilter{
public class TagUnitPriceFilter extends AbstractTagConsumFilter {
private static TagUnitPriceFilter instance;
public static TagUnitPriceFilter getInstance() {
if(null==instance){
instance=new TagUnitPriceFilter();
if (null == instance) {
instance = new TagUnitPriceFilter();
}
return instance;
}
private TagUnitPriceFilter(){}
private TagUnitPriceFilter() {
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
......@@ -41,67 +49,106 @@ public class TagUnitPriceFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> salesLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> brandLabelRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
TagConsumeRequest consumeRequest = (TagConsumeRequest) request;
JavaRDD<TrdEcuSalesBeanBase> salesLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveSalesLabel.getDatasetByEntId(enterpriseId), TrdEcuSalesBeanBase.class).javaRDD();
JavaRDD<TrdEcuBrandBean> brandLabelRDD = MysqlRddManager.getPojoFromDataset(dataSourceHiveBrandLabel.getDatasetByEntId(enterpriseId), TrdEcuBrandBean.class).javaRDD();
JavaRDD<Tuple2<TrdEcuSalesLabelBean,Optional<Iterable<TrdEcuBrandLabelBean>>>>labelRDD=salesLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data))
.leftOuterJoin(brandLabelRDD.mapToPair(data->Tuple2.apply(data.getEcu_id(),data)).groupByKey())
.map(data->data._2());
JavaRDD<Tuple2<TrdEcuSalesBeanBase, Optional<Iterable<TrdEcuBrandBean>>>> labelRDD = salesLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.leftOuterJoin(brandLabelRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data)).groupByKey())
.map(data -> data._2());
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=statisticsTypeHandle(labelRDD,consumeRequest);
JavaRDD<TrdEcuSalesBeanBase> consumeRDD = statisticsTypeHandle(labelRDD, consumeRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
x.setReceive_amt(x.getReceive_amt()+y.getReceive_amt());
x.setPay_amt(x.getPay_amt()+y.getPay_amt());
x.setOrder_times(x.getOrder_times()+y.getOrder_times());
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long> ecuRdd = consumeRDD.mapToPair(data -> Tuple2.apply(data.getEcu_id(), data))
.reduceByKey((x, y) -> {
x.setReceive_amt(x.getReceive_amt() + y.getReceive_amt());
x.setPay_amt(x.getPay_amt() + y.getPay_amt());
x.setOrder_times(x.getOrder_times() + y.getOrder_times());
return x;
})
.mapPartitions(data->{
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,TrdEcuSalesLabelBean> tp2=data.next();
double CusSinglePiece=1==configStatus?CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())
:CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt())/CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num());
switch (consumeRequest.getNumberType()){
.mapPartitions(data -> {
List<Long> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, TrdEcuSalesBeanBase> tp2 = data.next();
double CusSinglePiece = 1 == configStatus ? CommonUtil.isEmptyDouble2double(tp2._2().getPay_amt()) / CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num())
: CommonUtil.isEmptyDouble2double(tp2._2().getReceive_amt()) / CommonUtil.isEmptyInteger2int(tp2._2().getSeff_goods_num());
switch (consumeRequest.getNumberType()) {
case gt:
if(CusSinglePiece>consumeRequest.getBeginNum()){
if (CusSinglePiece > consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case gte:
if(CusSinglePiece>=consumeRequest.getBeginNum()){
if (CusSinglePiece >= consumeRequest.getBeginNum()) {
result.add(tp2._1());
}
break;
case lt:
if(CusSinglePiece<consumeRequest.getEndNum()){
if (CusSinglePiece < consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case lte:
if(CusSinglePiece<=consumeRequest.getEndNum()){
if (CusSinglePiece <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
break;
case eq:
if(CusSinglePiece==consumeRequest.getEqualNum()){
if (CusSinglePiece == consumeRequest.getEqualNum()) {
result.add(tp2._1());
}
break;
case between:
if(CusSinglePiece>=consumeRequest.getBeginNum()
&&CusSinglePiece<=consumeRequest.getEndNum()){
if (CusSinglePiece >= consumeRequest.getBeginNum()
&& CusSinglePiece <= consumeRequest.getEndNum()) {
result.add(tp2._1());
}
default:break;
default:
break;
}
}
return result.iterator();
});
return ecuRdd;
}
@Override
public JavaPairRDD<Long, String> filterValidMember(Integer enterpriseId, List<FilterProcessEntity> processEntityList) {
JavaRDD<ConsumeStatisticEntity> entityJavaRDD = getConsumeEntity(enterpriseId);
int configStatus = CommonUtil.getConfigStatus(enterpriseId);
JavaPairRDD<Long, String> rdd = entityJavaRDD.mapToPair(data -> {
Set<String> groupIds = new HashSet<>();
for (FilterProcessEntity entity : processEntityList) {
TagConsumeDoubleRequest consumeRequest = (TagConsumeDoubleRequest) entity.getRequest();
List<TrdEcuSalesBeanBase> salseBeanList = filterSalesBean(data, entity.getRequest());
if (salseBeanList.size() == 0) {
continue;
}
String tagGroupId = entity.getTagGroupId();
double receiveAmount = 0;
double payAmount = 0;
int orderTimes = 0;
for (TrdEcuSalesBeanBase beanBase : salseBeanList) {
receiveAmount += beanBase.getReceive_amt();
payAmount += beanBase.getPay_amt();
orderTimes += beanBase.getSeff_goods_num();
}
double cusSinglePiece = 1 == configStatus ? payAmount / orderTimes
: receiveAmount / orderTimes;
handleValueCompare(groupIds, consumeRequest, tagGroupId, cusSinglePiece);
}
return Tuple2.apply(data.ecuId, groupIds.size() == 0 ? null : Joiner.on(" ").join(groupIds));
}).filter(data -> data._2() != null);
return rdd;
}
}
......@@ -16,104 +16,106 @@ public class TagFilterFactory {
private static TagFilterFactory instance;
public static TagFilterFactory getInstance() {
if(null==instance){
instance=new TagFilterFactory();
if (null == instance) {
instance = new TagFilterFactory();
}
return instance;
}
private TagFilterFactory(){}
public BaseTagFilter getTagFilter(TagConditionDTO conditionDTO){
BaseTagFilter tagFilter=null;
if(StringUtils.isNotEmpty(conditionDTO.getTagEsFieldName())){
private TagFilterFactory() {
}
public BaseTagFilter getTagFilter(TagConditionDTO conditionDTO) {
BaseTagFilter tagFilter = null;
if (StringUtils.isNotEmpty(conditionDTO.getTagEsFieldName())) {
switch (conditionDTO.getTagEsFieldName()) {
//积分信息
case TagConstant.TAG_CODE_ACCUMULATED_INTEGRAL:
tagFilter= TagAccumulatedIntegralFilter.getInstance();
tagFilter = TagAccumulatedIntegralFilter.getInstance();
break;
case TagConstant.TAG_CODE_ABOUT_EXPIRE_INTEGRAL:
tagFilter= TagAboutExpireIntegralFilter.getInstance();
tagFilter = TagAboutExpireIntegralFilter.getInstance();
break;
//卡卷类
case TagConstant.TAG_CODE_CURRENT_COUPON_NUM:
tagFilter= TagCurrentCouponNumFilter.getInstance();
tagFilter = TagCurrentCouponNumFilter.getInstance();
break;
case TagConstant.TAG_CODE_UNCLAIMED_COUPON:
tagFilter= TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_GET_NO_CANCEL_COUPON:
tagFilter= TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_NO_CANCEL_EXPIRES_COUPON:
tagFilter= TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_CANCEL_COUPON:
tagFilter=TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_DONATION_IN_COUPON:
tagFilter=TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_YET_DONATION_COUPON:
tagFilter=TagCouponFilter.getInstance();
tagFilter = TagCouponFilter.getInstance();
break;
//消费统计
case TagConstant.TAG_CODE_CONSUMPTION_TIME:
tagFilter=TagConsumptionTimeFilter.getInstance();
tagFilter = TagConsumptionTimeFilter.getInstance();
break;
case TagConstant.TAG_CODE_PER_CUSTOMER_TRANSACTION:
tagFilter=TagPerCustomerTransactionFilter.getInstance();
tagFilter = TagPerCustomerTransactionFilter.getInstance();
break;
case TagConstant.TAG_CODE_UNIT_PRICE:
tagFilter=TagUnitPriceFilter.getInstance();
tagFilter = TagUnitPriceFilter.getInstance();
break;
case TagConstant.TAG_CODE_ASSOCIATED_PURCHASE_RATE:
tagFilter=TagAssociatedPurchaseRateFilter.getInstance();
tagFilter = TagAssociatedPurchaseRateFilter.getInstance();
break;
case TagConstant.TAG_CODE_AVERAGE_DISCOUNT_FACTOR:
tagFilter=TagAverageDiscountFactorFilter.getInstance();
tagFilter = TagAverageDiscountFactorFilter.getInstance();
break;
case TagConstant.TAG_CODE_CONSUMPTION_SLEEP_DAYS:
tagFilter=TagConsumptionSleepDaysFilter.getInstance();
tagFilter = TagConsumptionSleepDaysFilter.getInstance();
break;
//消费金额记录
case TagConstant.TAG_CODE_CONSUME_TOTAL:
tagFilter=TagConsumeTotalFilter.getInstance();
tagFilter = TagConsumeTotalFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_CONSUME_TOTAL:
tagFilter=TagHistoryConsumeTotalFilter.getInstance();
tagFilter = TagHistoryConsumeTotalFilter.getInstance();
break;
case TagConstant.TAG_CODE_FIRST_CONSUMPTION_MONEY:
tagFilter=TagFirstConsumptionMoneyFilter.getInstance();
tagFilter = TagFirstConsumptionMoneyFilter.getInstance();
break;
case TagConstant.TAG_CODE_LATELY_CONSUMPTION_MONEY:
tagFilter=TagLatelyConsumptionMoneyFilter.getInstance();
tagFilter = TagLatelyConsumptionMoneyFilter.getInstance();
break;
case TagConstant.TAG_CODE_TOP_SINGLE_CONSUMPTION_MONEY:
tagFilter=TagTopSingleConsumptionMoneyFilter.getInstance();
tagFilter = TagTopSingleConsumptionMoneyFilter.getInstance();
break;
case TagConstant.TAG_CODE_LOWEST_SINGLE_CONSUMPTION_MONEY:
tagFilter=TagLowestSingleConsumptionMoneyFilter.getInstance();
tagFilter = TagLowestSingleConsumptionMoneyFilter.getInstance();
break;
//消费时间
case TagConstant.TAG_CODE_FIRST_CONSUME_TIME:
tagFilter=TagFirstConsumeTimeFilter.getInstance();
tagFilter = TagFirstConsumeTimeFilter.getInstance();
break;
// case TagConstant.TAG_CODE_HISTORY_CONSUME_TIME:
// tagFilter=
// break;
case TagConstant.TAG_CODE_LATELY_CONSUME_TIME:
tagFilter=TagLatelyConsumeTimeFilter.getInstance();
tagFilter = TagLatelyConsumeTimeFilter.getInstance();
break;
case TagConstant.TAG_CODE_CONSUME_TIME:
tagFilter=TagConsumeTimeFilter.getInstance();
tagFilter = TagConsumeTimeFilter.getInstance();
break;
case TagConstant.TAG_CODE_CONSUME_SLEEP_DAY:
// tagFilter=
......@@ -121,60 +123,60 @@ public class TagFilterFactory {
//渠道
case TagConstant.TAG_CODE_FIRST_CONSUMPTION_CHANNEL:
tagFilter=TagFirstConsumptionChannelFilter.getInstance();
tagFilter = TagFirstConsumptionChannelFilter.getInstance();
break;
case TagConstant.TAG_CODE_LATELY_CONSUMPTION_CHANNEL:
tagFilter=TagLatelyConsumptionChannelFilter.getInstance();
case TagConstant.TAG_CODE_LATELY_CONSUMPTION_CHANNEL:
tagFilter = TagLatelyConsumptionChannelFilter.getInstance();
break;
//线下消费门店
case TagConstant.TAG_CODE_FIRST_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagFirstOfflineConsumptionStoreFilter.getInstance();
tagFilter = TagFirstOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_RECENTLY_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagRecentlyOfflineConsumptionStoreFilter.getInstance();
tagFilter = TagRecentlyOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagHistoryOfflineConsumptionStoreFilter.getInstance();
tagFilter = TagHistoryOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagOfflineConsumptionStoreFilter.getInstance();
tagFilter = TagOfflineConsumptionStoreFilter.getInstance();
break;
//线上消费店铺
case TagConstant.TAG_CODE_FIRST_ONLINE_CONSUMPTION_STORE:
tagFilter=TagFirstOnlineConsumptionStoreFilter.getInstance();
tagFilter = TagFirstOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_LATELY_ONLINE_CONSUMPTION_STORE:
tagFilter=TagLatelyOnlineConsumptionStoreFilter.getInstance();
tagFilter = TagLatelyOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_ONLINE_CONSUMPTION_STORE:
tagFilter=TagHistoryOnlineConsumptionStoreFilter.getInstance();
tagFilter = TagHistoryOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_ONLINE_CONSUMPTION_STORE:
tagFilter=TagOnlineConsumptionStoreFilter.getInstance();
tagFilter = TagOnlineConsumptionStoreFilter.getInstance();
break;
//消费商品
case TagConstant.TAG_CODE_FIRST_CONSUME_COMMODITY:
tagFilter=TagFirstConsumeCommodityFilter.getInstance();
tagFilter = TagFirstConsumeCommodityFilter.getInstance();
break;
case TagConstant.TAG_CODE_LATELY_CONSUME_COMMODITY:
tagFilter=TagLatelyConsumeCommodityFilter.getInstance();
tagFilter = TagLatelyConsumeCommodityFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_CONSUME_COMMODITY:
tagFilter=TagHistoryConsumeCommodityFilter.getInstance();
tagFilter = TagHistoryConsumeCommodityFilter.getInstance();
break;
case TagConstant.TAG_CODE_CONSUME_COMMODITY:
tagFilter=TagConsumeCommodityFilter.getInstance();
tagFilter = TagConsumeCommodityFilter.getInstance();
break;
default:
DingtalkMessageUtil.sendAlertMessage("暂未支持的非实时标签: " + conditionDTO.getTagEsFieldName());
break;
}
}else{
System.out.println("conditionDTO==>"+ JSONObject.toJSONString(conditionDTO));
} else {
System.out.println("conditionDTO==>" + JSONObject.toJSONString(conditionDTO));
}
return tagFilter;
......
......@@ -12,6 +12,9 @@ import java.util.List;
public class TagProcessEntity {
int enterpriseId;
long tagGroupId;
/**
* 1 实时 2 非实时
*/
Integer realTime;
int level;
List<TagConditionDTO> tagList;
......
......@@ -57,7 +57,8 @@ public class TagProcessManager {
private List<SceneCrowdDTO> sceneCrowdDTOList = new ArrayList();
private MysqlRddManager member4RddManager;
private MysqlRddManager enterprise4RddManager;
private DataSourceSharding memberSharding4Datasource;
private DataSourceSharding enterpriseUserDatasource;
private DataSourceSharding enterpriseUserRelationDatasource;
private MysqlDatasource member4Datasource = null;
private MysqlDatasource enterprise4Datasource = null;
private boolean isProduction;
......@@ -98,7 +99,8 @@ public class TagProcessManager {
}
member4RddManager = member4Datasource.buildRddManager();
enterprise4RddManager = enterprise4Datasource.buildRddManager();
memberSharding4Datasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER);
enterpriseUserDatasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER);
enterpriseUserRelationDatasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER_RELATION);
List<TabSceneCrowd> sceneCrowdList = member4RddManager.getPojo("tab_scene_crowd", TabSceneCrowd.class, null)
.filter(new Column("delete_flag").equalTo(0))
......@@ -175,15 +177,13 @@ public class TagProcessManager {
for (TagConditionDTO conditionDTO : processEntity.tagList) {
if (tagIdToFilterMap.containsKey(conditionDTO.getTagId())) {
for (DataSourceEntity sourceEntity : tagIdToFilterMap.get(conditionDTO.getTagId()).necessarySourceList()) {
// System.out.println("enterpriseId==>"+enterpriseTagEntry.getKey());
// System.out.println("SourceKey==>"+sourceEntity.getSourceKey());
// System.out.println("HiveTableName==>"+sourceEntity.getHiveTableName());
DataSourceManager.getInstance().addSourceEntity(sourceEntity, enterpriseTagEntry.getKey().intValue());
DataSourceManager.getInstance().addSourceEntity(sourceEntity, enterpriseTagEntry.getKey());
}
}
}
}
DataSourceManager.getInstance().addSourceEntity(memberSharding4Datasource, enterpriseTagEntry.getKey().intValue());
DataSourceManager.getInstance().addSourceEntity(enterpriseUserDatasource, enterpriseTagEntry.getKey());
DataSourceManager.getInstance().addSourceEntity(enterpriseUserRelationDatasource, enterpriseTagEntry.getKey());
}
if (extractData) {
......@@ -194,8 +194,8 @@ public class TagProcessManager {
//处理标签数据
JavaSparkContext jsc = SparkEnvManager.getInstance().getJsc();
List<Long> sceneCrowdIdList = new ArrayList();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
List<Long> sceneCrowdIdList = new ArrayList();
Integer enterpriseId = enterpriseTagEntry.getKey();
String indexName = EsRequestUtil.getESIindexName(enterpriseId, this.isProduction());
......@@ -210,7 +210,6 @@ public class TagProcessManager {
JavaPairRDD<Long, String> filterRdd = tagFilter.filterValidMember(enterpriseId, filterRequest).mapToPair(data -> Tuple2.apply(data, groupId));
System.out.println("filterRdd==>" + filterRdd.count());
if (null == memberGroupRdd) {
memberGroupRdd = filterRdd;
} else {
......@@ -222,7 +221,8 @@ public class TagProcessManager {
}
if (null != memberGroupRdd) {
JavaPairRDD<Long, Long> userRdd = memberSharding4Datasource.getDatasetByEnterpriseId(enterpriseId).select("id").javaRDD()
JavaPairRDD<Long, Long> userRdd = enterpriseUserDatasource.getDatasetByEnterpriseId(enterpriseId).select("id", "delete_flag").javaRDD()
.filter(data -> 0 == (Integer) data.getAs("delete_flag"))
.mapToPair(data -> Tuple2.apply((Long) data.getAs("id"), (Long) data.getAs("id")))
.reduceByKey((x, y) -> x);
......@@ -255,60 +255,62 @@ public class TagProcessManager {
//处理混合标签
JavaPairRDD<Long, String> searchRDD = null;
for (TagProcessEntity mixEntity : enterpriseTagEntry.getValue()) {
if (mixEntity.realTime == 3) {
Long tagGroupId = mixEntity.tagGroupId;
String query = EsRequestUtil.getIndexParam(enterpriseId, tagGroupId, this.isProduction);
if (StringUtils.isNotEmpty(query)) {
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.query", query);
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB = tagGroupId.toString();
if (null != data._2().get("sceneTags_b")) {
sceneTagsB = sceneTagsB + " " + data._2().get("sceneTags_b");
}
return Tuple2.apply((Long) data._2.get("id"), sceneTagsB);
});
if (null == searchRDD) {
searchRDD = esRdd;
} else {
searchRDD = searchRDD.union(esRdd);
}
}
}
}
if (null != searchRDD) {
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x, y) -> x + " " + y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD, indexName);
}
/**
JavaPairRDD<Long, String> searchRDD = null;
for (TagProcessEntity mixEntity : enterpriseTagEntry.getValue()) {
if (mixEntity.realTime == 3) {
Long tagGroupId = mixEntity.tagGroupId;
String query = EsRequestUtil.getIndexParam(enterpriseId, tagGroupId, this.isProduction);
if (StringUtils.isNotEmpty(query)) {
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.query", query);
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB = tagGroupId.toString();
if (null != data._2().get("sceneTags_b")) {
sceneTagsB = sceneTagsB + " " + data._2().get("sceneTags_b");
}
return Tuple2.apply((Long) data._2.get("id"), sceneTagsB);
});
if (null == searchRDD) {
searchRDD = esRdd;
} else {
searchRDD = searchRDD.union(esRdd);
}
}
}
}
if (null != searchRDD) {
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x, y) -> x + " " + y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD, indexName);
}
*/
}
}
......
package com.gic.spark.tag;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.config.SparkConfigManager;
import com.gic.spark.datasource.DataSourceManager;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlDatasource;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.*;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.table.TabDataActuallyPaidConfig;
import com.gic.spark.entity.table.TabSceneCrowd;
import com.gic.spark.filter.BaseTagFilter;
import com.gic.spark.util.*;
import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.PropertiesSettings;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.spark.ESShardPartitioner;
import org.elasticsearch.spark.EsRddFactory;
import org.elasticsearch.spark.cfg.SparkSettings;
import org.elasticsearch.spark.cfg.SparkSettingsManager;
import org.elasticsearch.spark.rdd.EsRDDWriter;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.*;
/**
* @description:
* @author: wangxk
* @date: 2020/4/15
*/
public class TagProcessManagerNew {
private List<SceneCrowdDTO> sceneCrowdDTOList = new ArrayList();
private MysqlRddManager member4RddManager;
private MysqlRddManager enterprise4RddManager;
private DataSourceSharding enterpriseUserDatasource;
private DataSourceSharding enterpriseUserRelationDatasource;
private MysqlDatasource member4Datasource = null;
private MysqlDatasource enterprise4Datasource = null;
private boolean isProduction;
private static TagProcessManagerNew instance;
public static TagProcessManagerNew getInstance() {
if (null == instance) {
instance = new TagProcessManagerNew();
}
return instance;
}
private TagProcessManagerNew() {
}
public boolean isProduction() {
return isProduction;
}
public void init(boolean isProd) {
this.isProduction = isProd;
if (isProduction) {
enterprise4Datasource = SparkConfigManager.getInstance().getDatasource("4.0-enterprise");
member4Datasource = SparkConfigManager.getInstance().getDatasourceByIdAndSchema("4.0-prodDs", "gic_member_4_0");
} else {
member4Datasource = new MysqlDatasource();
member4Datasource.setJdbcUrl(MysqlDatasource.devDatasource.getJdbcUrl().replaceAll("gic3_test", "gic-member4.0"));
member4Datasource.setUser("cdb_outerroot");
member4Datasource.setPassword("@09ui%sbc09");
enterprise4Datasource = new MysqlDatasource();
enterprise4Datasource.setJdbcUrl(MysqlDatasource.devDatasource.getJdbcUrl().replaceAll("gic3_test", "gic-enterprise4.0"));
enterprise4Datasource.setUser("cdb_outerroot");
enterprise4Datasource.setPassword("@09ui%sbc09");
}
member4RddManager = member4Datasource.buildRddManager();
enterprise4RddManager = enterprise4Datasource.buildRddManager();
enterpriseUserDatasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER);
enterpriseUserRelationDatasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER_RELATION);
List<TabSceneCrowd> sceneCrowdList = member4RddManager.getPojo("tab_scene_crowd", TabSceneCrowd.class, null)
.filter(new Column("delete_flag").equalTo(0))
.filter(new Column("valid_flag").equalTo(1))
.javaRDD()
.filter(data -> {
if (null != data.getReal_Time()) {
if (data.getReal_Time() == 2) {
return true;
} else if (data.getReal_Time() == 3) {
return true;
}
}
return false;
})
.collect();
for (TabSceneCrowd sceneCrowd : sceneCrowdList) {
LinkedList<TagConditionGroupDTO> conditionGroupDTOList = JSONObject.parseObject(sceneCrowd.getTag_Condition_Group_Info(), new TypeReference<LinkedList<TagConditionGroupDTO>>() {
});
sceneCrowdDTOList.add(new SceneCrowdDTO(sceneCrowd, conditionGroupDTOList));
}
List<TabDataActuallyPaidConfig> dataActuallyPaidConfigList = enterprise4RddManager.getPojo("tab_data_actually_paid_config", TabDataActuallyPaidConfig.class, null)
.filter(new Column("status").equalTo(1))
.collectAsList();
dataActuallyPaidConfigList.forEach(data -> CommonUtil.dataActuallyPaidConfigMap.put(data.getEnterprise_Id(), data));
}
public void setEnterpriseId(Set<Integer> enterpriseIdList) {
sceneCrowdDTOList.removeIf(sceneCrowdDTO -> !enterpriseIdList.contains(sceneCrowdDTO.getEnterprise_Id()));
}
public void setTagGroupId(Set<Long> tagGroupIdList) {
sceneCrowdDTOList.removeIf(sceneCrowdDTO -> !tagGroupIdList.contains(sceneCrowdDTO.getId()));
}
public void process(boolean extractData) {
Map<Integer, List<TagProcessEntity>> tagGroupByEnterpriseMap = new HashMap<>();
Map<Long, BaseTagFilter> tagIdToFilterMap = new HashMap();
for (SceneCrowdDTO sceneCrowdDTO : sceneCrowdDTOList) {
LinkedList<TagConditionGroupDTO> conditionGroupDTOS = sceneCrowdDTO.getConditionGroupDTOList();
for (int i = 0; i < conditionGroupDTOS.size(); i++) {
TagProcessEntity entity = new TagProcessEntity();
entity.enterpriseId = sceneCrowdDTO.getEnterprise_Id();
entity.tagGroupId = sceneCrowdDTO.getId();
entity.realTime = sceneCrowdDTO.getReal_Time();
entity.level = i + 1;
entity.tagList = conditionGroupDTOS.get(i).getConditionInfos();
for (TagConditionDTO conditionDTO : entity.tagList) {//将tag同filter进行映射
if (conditionDTO.getRealTime() == 0) {
BaseTagFilter tagFilter = TagFilterFactory.getInstance().getTagFilter(conditionDTO);
if (null != tagFilter) {
tagIdToFilterMap.put(conditionDTO.getTagId(), tagFilter);
}
}
}
if (!tagGroupByEnterpriseMap.containsKey(sceneCrowdDTO.getEnterprise_Id())) {
tagGroupByEnterpriseMap.put(sceneCrowdDTO.getEnterprise_Id(), new ArrayList());
}
tagGroupByEnterpriseMap.get(sceneCrowdDTO.getEnterprise_Id()).add(entity);
}
}
//准备标签需要的数据
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
for (TagProcessEntity processEntity : enterpriseTagEntry.getValue()) {
for (TagConditionDTO conditionDTO : processEntity.tagList) {
if (tagIdToFilterMap.containsKey(conditionDTO.getTagId())) {
for (DataSourceEntity sourceEntity : tagIdToFilterMap.get(conditionDTO.getTagId()).necessarySourceList()) {
DataSourceManager.getInstance().addSourceEntity(sourceEntity, enterpriseTagEntry.getKey());
}
}
}
}
DataSourceManager.getInstance().addSourceEntity(enterpriseUserDatasource, enterpriseTagEntry.getKey());
DataSourceManager.getInstance().addSourceEntity(enterpriseUserRelationDatasource, enterpriseTagEntry.getKey());
}
if (extractData) {
DataSourceManager.getInstance().init(isProduction);
DataSourceManager.getInstance().extractDataToDatabase();
DingtalkMessageUtil.sendAlertMessage("extractData to hive finish !");
}
//处理标签数据
JavaSparkContext jsc = SparkEnvManager.getInstance().getJsc();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
List<Long> sceneCrowdIdList = new ArrayList();
Integer enterpriseId = enterpriseTagEntry.getKey();
String indexName = EsRequestUtil.getESIindexName(enterpriseId, this.isProduction());
JavaPairRDD<Long, String> memberGroupRdd = null;
Map<Long, List<FilterProcessEntity>> tagInfoByTagId = new HashMap<>();
for (TagProcessEntity entity : enterpriseTagEntry.getValue()) {
for (TagConditionDTO conditionDTO : entity.tagList) {
if (tagIdToFilterMap.containsKey(conditionDTO.getTagId())) {
final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
AbstractFilterRequest filterRequest = TagValueParser.parseFilterValue(conditionDTO, enterpriseId);
FilterProcessEntity processEntity = new FilterProcessEntity(groupId, filterRequest);
if (!tagInfoByTagId.containsKey(conditionDTO.getTagId())) {
tagInfoByTagId.put(conditionDTO.getTagId(), new ArrayList<>());
}
tagInfoByTagId.get(conditionDTO.getTagId()).add(processEntity);
sceneCrowdIdList.add(entity.tagGroupId);
}
}
}
for (Map.Entry<Long, List<FilterProcessEntity>> entry : tagInfoByTagId.entrySet()) {
Long tagId = entry.getKey();
BaseTagFilter tagFilter = tagIdToFilterMap.get(tagId);
JavaPairRDD<Long, String> filterRdd = tagFilter.filterValidMember(enterpriseId, entry.getValue());
if (null == memberGroupRdd) {
memberGroupRdd = filterRdd;
} else {
memberGroupRdd = memberGroupRdd.union(filterRdd);
}
}
// for (TagProcessEntity entity : enterpriseTagEntry.getValue()) {
// for (TagConditionDTO conditionDTO : entity.tagList) {
// if (tagIdToFilterMap.containsKey(conditionDTO.getTagId())) {
// BaseTagFilter tagFilter = tagIdToFilterMap.get(conditionDTO.getTagId());
// AbstractFilterRequest filterRequest = TagValueParser.parseFilterValue(conditionDTO, enterpriseId);
//
// final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
//
// JavaPairRDD<Long, String> filterRdd = tagFilter.filterValidMember(enterpriseId, filterRequest).mapToPair(data -> Tuple2.apply(data, groupId));
//
// if (null == memberGroupRdd) {
// memberGroupRdd = filterRdd;
// } else {
// memberGroupRdd = memberGroupRdd.union(filterRdd);
// }
// sceneCrowdIdList.add(entity.tagGroupId);
// }
// }
// }
if (null != memberGroupRdd) {
JavaPairRDD<Long, Long> userRdd = enterpriseUserDatasource.getDatasetByEnterpriseId(enterpriseId).select("id", "delete_flag").javaRDD()
.filter(data -> 0 == (Integer) data.getAs("delete_flag"))
.mapToPair(data -> Tuple2.apply((Long) data.getAs("id"), (Long) data.getAs("id")))
.reduceByKey((x, y) -> x);
JavaPairRDD<Long, String> updateMemberGroupRdd = userRdd.leftOuterJoin(memberGroupRdd.reduceByKey((x, y) -> x + " " + y))
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> result = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, Optional<String>> tp2 = data.next()._2();
Long id = tp2._1();
JSONObject json = new JSONObject();
json.put("id", id);
json.put("sceneTags_b", tp2._2().isPresent() ? tp2._2().get() : "");
result.add(Tuple2.apply(id, json.toString()));
}
return result.iterator();
});
updateMemberGroupRdd.foreach(data -> System.out.println("id==>" + data._1() + "sceneTags_b==>" + data._2()));
System.out.println("updateMemberGroupRdd==>" + updateMemberGroupRdd.count());
JavaPairRDD<Long, String> cacheMemberGroupRdd = updateMemberGroupRdd.cache();
updateIndex(cacheMemberGroupRdd, indexName);
saveToHive(cacheMemberGroupRdd, enterpriseId);
cacheMemberGroupRdd.unpersist();
updateSceneCrowd(sceneCrowdIdList);
}
//处理混合标签
/**
JavaPairRDD<Long, String> searchRDD = null;
for (TagProcessEntity mixEntity : enterpriseTagEntry.getValue()) {
if (mixEntity.realTime == 3) {
Long tagGroupId = mixEntity.tagGroupId;
String query = EsRequestUtil.getIndexParam(enterpriseId, tagGroupId, this.isProduction);
if (StringUtils.isNotEmpty(query)) {
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.query", query);
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB = tagGroupId.toString();
if (null != data._2().get("sceneTags_b")) {
sceneTagsB = sceneTagsB + " " + data._2().get("sceneTags_b");
}
return Tuple2.apply((Long) data._2.get("id"), sceneTagsB);
});
if (null == searchRDD) {
searchRDD = esRdd;
} else {
searchRDD = searchRDD.union(esRdd);
}
}
}
}
if (null != searchRDD) {
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x, y) -> x + " " + y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD, indexName);
}
*/
}
}
private void saveToHive(JavaPairRDD<Long, String> updateMemberGroup, Integer enterpriseId) {
JavaRDD<TagGroupInfo> tagGroupInfoRDD = updateMemberGroup.mapPartitions(data -> {
List<TagGroupInfo> tagGroupInfoList = new ArrayList();
while (data.hasNext()) {
Tuple2<Long, String> tp2 = data.next();
JSONObject json = JSONObject.parseObject(tp2._2());
if (StringUtils.isNotEmpty(json.getString("sceneTags_b"))) {
tagGroupInfoList.add(new TagGroupInfo(json.getLong("id"), enterpriseId, json.getString("sceneTags_b")));
}
}
return tagGroupInfoList.iterator();
});
System.out.println("tagGroupInfoRDD.count==>" + tagGroupInfoRDD.count());
SparkSession sparkSession = SparkEnvManager.getInstance().getSparkSession();
Dataset<Row> tagGroupInfoDataset = sparkSession.createDataFrame(tagGroupInfoRDD, TagGroupInfo.class);
tagGroupInfoDataset.createOrReplaceTempView("tag_group_info_tmp");
sparkSession.sql(String.format("INSERT overwrite table %s.tag_group_info PARTITION(enterprise_id = '%s') SELECT id,tag_group_info from tag_group_info_tmp", isProduction ? "tag4_prod" : "tag4_test", enterpriseId));
}
private void updateIndex(JavaPairRDD<Long, String> updateMemberGroup, String indexName) {
Map<String, String> conf = new HashMap();
System.out.println("indexName===>" + indexName);
conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.mapping.date.rich", "false");
conf.put("es.index.auto.create", "true");
conf.put("es.mapping.id", "id");
conf.put("es.write.operation", "upsert");
conf.put("es.batch.size.bytes", "1024k");
conf.put("es.batch.size.entries", "1500");
conf.put(ConfigurationOptions.ES_INPUT_JSON, "true");
SparkSettings sparkCfg = new SparkSettingsManager().load(SparkEnvManager.getInstance().getSparkContext().getConf());
Settings cfg = new PropertiesSettings().load(sparkCfg.save());
cfg.merge(conf);
String settings = cfg.save();
ESShardPartitioner partitioner = new ESShardPartitioner(settings);
updateMemberGroup.partitionBy(partitioner).map(data -> data._2()).foreachPartition(data -> {
Settings newSettings = new PropertiesSettings().load(settings);
EsRDDWriter<String> writer = EsRddFactory.createJsonWriter(newSettings.save(), false);
writer.write(TaskContext.get(), JavaConversions.asScalaIterator(data));
});
}
private void updateSceneCrowd(List<Long> sceneCrowdIdList) {
Connection connection = member4Datasource.getConnection();
PreparedStatement ps = null;
try {
ps = DaoHelper.getPreparedStatement(connection, "update tab_scene_crowd set update_group_time=? where id=?");
for (Long id : sceneCrowdIdList) {
ps.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
ps.setLong(2, id);
ps.addBatch();
}
ps.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (null != ps) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
private void mixTagProcess(Map<Integer, List<TagProcessEntity>> tagGroupByEnterpriseMap) {
//处理混合标签
JavaSparkContext jsc = SparkEnvManager.getInstance().getJsc();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseMixTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Integer enterpriseId = enterpriseMixTagEntry.getKey();
String indexName = EsRequestUtil.getESIindexName(enterpriseId, this.isProduction());
JavaPairRDD<Long, String> searchRDD = null;
for (TagProcessEntity mixEntity : enterpriseMixTagEntry.getValue()) {
Long tagGroupId = mixEntity.tagGroupId;
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
// conf.put("es.query", query.toString());
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB = tagGroupId.toString();
if (null != data._2().get("sceneTags_b")) {
sceneTagsB = sceneTagsB + " " + data._2().get("sceneTags_b");
}
return Tuple2.apply((Long) data._2.get("id"), sceneTagsB);
});
if (null == searchRDD) {
searchRDD = esRdd;
} else {
searchRDD = searchRDD.union(esRdd);
}
}
if (null != searchRDD) {
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x, y) -> x + " " + y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD, indexName);
}
}
}
}
package com.gic.spark.util;
import com.gic.spark.entity.table.TabDataActuallyPaidConfig;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
......@@ -13,46 +12,54 @@ import java.util.Map;
*/
public class CommonUtil {
public static Map<Integer,TabDataActuallyPaidConfig> dataActuallyPaidConfigMap=new HashMap();
public static Map<Integer, TabDataActuallyPaidConfig> dataActuallyPaidConfigMap = new HashMap();
/**
* 1:实付
* 0:关闭(应付)
*
* @param enterprise_Id
* @return
*/
public static Integer getConfigStatus(Integer enterprise_Id){
TabDataActuallyPaidConfig dataActuallyPaidConfig=dataActuallyPaidConfigMap.get(enterprise_Id);
if(null==dataActuallyPaidConfig||null==dataActuallyPaidConfig.getConfig_Status()){
public static Integer getConfigStatus(Integer enterprise_Id) {
TabDataActuallyPaidConfig dataActuallyPaidConfig = dataActuallyPaidConfigMap.get(enterprise_Id);
if (null == dataActuallyPaidConfig || null == dataActuallyPaidConfig.getConfig_Status()) {
return 0;
}else{
} else {
return dataActuallyPaidConfig.getConfig_Status();
}
}
public static int isEmptyInteger2int(Integer param,int defaultValue){
return null==param?defaultValue:param;
public static int isEmptyInteger2int(Integer param, int defaultValue) {
return null == param ? defaultValue : param;
}
public static int isEmptyInteger2int(Integer param){
return isEmptyInteger2int(param,0);
public static int isEmptyInteger2int(Integer param) {
return isEmptyInteger2int(param, 0);
}
public static long isEmptyLong2long(Long param,long defaultValue){
return null==param?defaultValue:param;
public static long isEmptyLong2long(Long param, long defaultValue) {
return null == param ? defaultValue : param;
}
public static long isEmptyLong2long(Long param){
return isEmptyLong2long(param,0l);
public static long isEmptyLong2long(Long param) {
return isEmptyLong2long(param, 0l);
}
public static float isEmptyFloat2float(Float param,float defaultValue){
return null==param?defaultValue:param;
public static float isEmptyFloat2float(Float param, float defaultValue) {
return null == param ? defaultValue : param;
}
public static float isEmptyFloat2float(Float param){
return isEmptyFloat2float(param,0f);
public static float isEmptyFloat2float(Float param) {
return isEmptyFloat2float(param, 0f);
}
public static double isEmptyDouble2double(Double param,double defaultValue){
return null==param?defaultValue:param;
public static double isEmptyDouble2double(Double param, double defaultValue) {
return null == param ? defaultValue : param;
}
public static double isEmptyDouble2double(Double param){
return isEmptyDouble2double(param,0d);
public static double isEmptyDouble2double(Double param) {
return isEmptyDouble2double(param, 0d);
}
}
......@@ -8,6 +8,7 @@ package com.gic.spark.util;
public class ConstantUtil {
public static final String TAB_ENTERPRISE_USER="tab_enterprise_user";
public static final String TAB_ENTERPRISE_USER_RELATION="tab_enterprise_user_relation";
public static final String TAB_COUPON_LOG="tab_coupon_log";
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment