Commit 1800bc4c by guos

会员标签4.0

parent 4f24f056
...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequestTime; ...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequestTime;
import com.gic.spark.util.ConstantUtil; import com.gic.spark.util.ConstantUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
...@@ -21,34 +23,42 @@ import java.util.List; ...@@ -21,34 +23,42 @@ import java.util.List;
*/ */
public abstract class AbstractTagConsumRecordFilter implements BaseTagFilter { public abstract class AbstractTagConsumRecordFilter implements BaseTagFilter {
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_WDORDER_D); DataSourceHive dataSourceHiveOrder = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_WDORDER_D);
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
protected static JavaRDD<TrdVirtualOrderBean> statisticsTypeHandle(JavaRDD<TrdVirtualOrderBean> consumeRecordRDD, AbstractFilterRequest request){ protected static JavaRDD<TrdVirtualOrderBean> statisticsTypeHandle(JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd, AbstractFilterRequest request){
consumeRecordRDD=consumeRecordRDD.mapPartitions(data->{ JavaRDD<TrdVirtualOrderBean>consumeRecordRDD=orderRdd.mapPartitions(data->{
List<TrdVirtualOrderBean> result=new ArrayList(); List<TrdVirtualOrderBean> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
TrdVirtualOrderBean consumeRecordBean=data.next(); Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>> tp2=data.next();
switch (request.getStatisticsType()){ TrdVirtualOrderBean consumeRecordBean=tp2._1();
case COMMODITYBRAND: switch (request.getStatisticsType()){
if(request.getStatisticsValList().contains(String.valueOf(consumeRecordBean.getEnt_id()))){ case COMMODITYBRAND:
result.add(consumeRecordBean); if(tp2._2().isPresent()){
} for(String ent_brand_id:tp2._2().get()){
break; if(request.getStatisticsValList().contains(ent_brand_id)){
case CHANNEL: result.add(consumeRecordBean);
if(request.getStatisticsValList().contains(String.valueOf(consumeRecordBean.getStore_info_id()))){ break;
result.add(consumeRecordBean); }
} }
break; }
case MCUINFO:
if(request.getStatisticsValList().contains(String.valueOf(consumeRecordBean.getArea_id()))){ break;
result.add(consumeRecordBean); case CHANNEL:
} if(request.getStatisticsValList().contains(String.valueOf(consumeRecordBean.getStore_info_id()))){
break; result.add(consumeRecordBean);
default:break; }
} break;
} case MCUINFO:
return result.iterator(); if(request.getStatisticsValList().contains(String.valueOf(consumeRecordBean.getArea_id()))){
}); result.add(consumeRecordBean);
}
break;
default:break;
}
}
return result.iterator();
});
return consumeRecordRDD; return consumeRecordRDD;
} }
......
...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil; ...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -24,7 +27,6 @@ import java.util.List; ...@@ -24,7 +27,6 @@ import java.util.List;
*/ */
public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
private static TagConsumeCommodityFilter instance; private static TagConsumeCommodityFilter instance;
...@@ -38,7 +40,7 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ ...@@ -38,7 +40,7 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem); result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -46,14 +48,22 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ ...@@ -46,14 +48,22 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request; TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest); Dataset<Row>OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.filter(data->checkTime(commodityRequest,DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime())) .filter(data->checkTime(commodityRequest,DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getVirtual_id(),data.getEcu_id())); .mapToPair(data-> Tuple2.apply(data.getVirtual_id(),data.getEcu_id()));
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD() JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{ .filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code()) if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){ &&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......
...@@ -7,8 +7,11 @@ import com.gic.spark.entity.request.AbstractFilterRequest; ...@@ -7,8 +7,11 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeTimeRequest; import com.gic.spark.entity.request.TagConsumeTimeRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column; import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -35,16 +38,24 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{ ...@@ -35,16 +38,24 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request; TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId) JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD(); .filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeTimeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19))) .mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19)))
.filter(data->checkTime(consumeTimeRequest,data._2().getTime())) .filter(data->checkTime(consumeTimeRequest,data._2().getTime()))
......
...@@ -9,6 +9,8 @@ import com.gic.spark.util.CommonUtil; ...@@ -9,6 +9,8 @@ import com.gic.spark.util.CommonUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -34,15 +36,23 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter { ...@@ -34,15 +36,23 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{ JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
......
...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil; ...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -24,7 +27,6 @@ import java.util.List; ...@@ -24,7 +27,6 @@ import java.util.List;
*/ */
public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
private static TagFirstConsumeCommodityFilter instance; private static TagFirstConsumeCommodityFilter instance;
...@@ -38,7 +40,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte ...@@ -38,7 +40,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem); result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -46,9 +48,18 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte ...@@ -46,9 +48,18 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request; TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest);
Dataset<Row> OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
...@@ -60,7 +71,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte ...@@ -60,7 +71,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
} }
}) })
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1())); .mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD() JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{ .filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code()) if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){ &&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......
...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeTimeRequest; ...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeTimeRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column; import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -34,16 +36,23 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter { ...@@ -34,16 +36,23 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter {
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request; TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId) JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD(); .filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeTimeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime())) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeChannelRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeChannelRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -30,16 +32,23 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil ...@@ -30,16 +32,23 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request; TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,channelRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
......
...@@ -7,7 +7,9 @@ import com.gic.spark.entity.request.AbstractFilterRequest; ...@@ -7,7 +7,9 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeAmountRequest; import com.gic.spark.entity.request.TagConsumeAmountRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.*; import java.util.*;
...@@ -32,15 +34,23 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte ...@@ -32,15 +34,23 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -35,16 +37,23 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco ...@@ -35,16 +37,23 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time()) &&StringUtils.isNotEmpty(data.getReceipts_time())
&&null!=data.getStore_info_id()) &&null!=data.getStore_info_id())
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -34,16 +36,24 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor ...@@ -34,16 +36,24 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time()) && StringUtils.isNotEmpty(data.getReceipts_time())
&& null!=data.getShop_id()) && null!=data.getShop_id())
......
...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil; ...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -24,8 +27,6 @@ import java.util.List; ...@@ -24,8 +27,6 @@ import java.util.List;
*/ */
public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
private static TagHistoryConsumeCommodityFilter instance; private static TagHistoryConsumeCommodityFilter instance;
public static TagHistoryConsumeCommodityFilter getInstance() { public static TagHistoryConsumeCommodityFilter getInstance() {
...@@ -38,7 +39,7 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil ...@@ -38,7 +39,7 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem); result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -46,10 +47,18 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil ...@@ -46,10 +47,18 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request; TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
Dataset<Row> OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest); consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaRDD<TrdVirtualOrderItemBean> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD(); JavaRDD<TrdVirtualOrderItemBean> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD();
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data.getEcu_id())) JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data.getEcu_id()))
.leftOuterJoin(orderItemRDD.mapToPair(data->Tuple2.apply(data.getVirtual_order_id(),data.getSku_code())) .leftOuterJoin(orderItemRDD.mapToPair(data->Tuple2.apply(data.getVirtual_order_id(),data.getSku_code()))
.filter(data->StringUtils.isNotEmpty(data._2()) .filter(data->StringUtils.isNotEmpty(data._2())
......
...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest; ...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeAmountRequest; import com.gic.spark.entity.request.TagConsumeAmountRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -33,15 +35,23 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{ ...@@ -33,15 +35,23 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),configStatus==1?data.getPaid_amt():data.getPay_amt())) JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),configStatus==1?data.getPaid_amt():data.getPay_amt()))
......
...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest; ...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest; import com.gic.spark.entity.request.TagConsumeStoreRequest;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -33,16 +35,23 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe ...@@ -33,16 +35,23 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& null!=data.getStore_info_id()) && null!=data.getStore_info_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id())) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
......
...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest; ...@@ -7,6 +7,8 @@ import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest; import com.gic.spark.entity.request.TagConsumeStoreRequest;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -33,16 +35,23 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec ...@@ -33,16 +35,23 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& null!=data.getShop_id()) && null!=data.getShop_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getShop_id())) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getShop_id()))
......
...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil; ...@@ -12,6 +12,9 @@ import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -24,7 +27,6 @@ import java.util.List; ...@@ -24,7 +27,6 @@ import java.util.List;
*/ */
public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilter{ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
DataSourceHive dataSourceHiveOrderItem = new DataSourceHive(ConstantUtil.DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D);
private static TagLatelyConsumeCommodityFilter instance; private static TagLatelyConsumeCommodityFilter instance;
...@@ -38,7 +40,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt ...@@ -38,7 +40,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem); result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -46,9 +48,17 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt ...@@ -46,9 +48,17 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request; TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest); Dataset<Row> OrderItemDS= dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId);
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderAndItemRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(OrderItemDS.select("virtual_order_id","ent_brand_id").javaRDD()
.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1)))
.groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderAndItemRdd,commodityRequest);
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
...@@ -61,7 +71,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt ...@@ -61,7 +71,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
}) })
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1())); .mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD() JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(OrderItemDS,TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{ .filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code()) if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){ &&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......
...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeTimeRequest; ...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeTimeRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column; import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -33,16 +35,24 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{ ...@@ -33,16 +35,24 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request; TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId) JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD(); .filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeTimeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime())) .mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeChannelRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeChannelRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -33,16 +35,24 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi ...@@ -33,16 +35,24 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request; TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,channelRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -35,15 +37,23 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil ...@@ -35,15 +37,23 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -34,16 +36,24 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco ...@@ -34,16 +36,24 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time()) && StringUtils.isNotEmpty(data.getReceipts_time())
&& null!=data.getShop_id()) && null!=data.getShop_id())
......
...@@ -9,7 +9,9 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest; ...@@ -9,7 +9,9 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.*; import java.util.*;
...@@ -34,15 +36,23 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco ...@@ -34,15 +36,23 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{ JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
boolean result=false; boolean result=false;
......
...@@ -9,6 +9,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -9,6 +9,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -35,7 +37,8 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil ...@@ -35,7 +37,8 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -43,9 +46,15 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil ...@@ -43,9 +46,15 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY); storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& null!=data.getStore_info_id() && null!=data.getStore_info_id()
&& StringUtils.isNotEmpty(data.getReceipts_time())) && StringUtils.isNotEmpty(data.getReceipts_time()))
......
...@@ -9,6 +9,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -9,6 +9,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -30,7 +32,8 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt ...@@ -30,7 +32,8 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -38,9 +41,15 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt ...@@ -38,9 +41,15 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY); storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time()) && StringUtils.isNotEmpty(data.getReceipts_time())
&& null!=data.getShop_id()) && null!=data.getShop_id())
......
...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest; ...@@ -8,6 +8,8 @@ import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -32,7 +34,8 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR ...@@ -32,7 +34,8 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
...@@ -40,9 +43,15 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR ...@@ -40,9 +43,15 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request; TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest); JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean,Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,storeRequest);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> data.getOrder_channel_code()==1 JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time()) &&StringUtils.isNotEmpty(data.getReceipts_time())
&&null!=data.getStore_info_id()) &&null!=data.getStore_info_id())
......
...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest; ...@@ -8,7 +8,9 @@ import com.gic.spark.entity.request.TagConsumeAmountRequest;
import com.gic.spark.util.CommonUtil; import com.gic.spark.util.CommonUtil;
import com.gic.spark.util.DateUtil; import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row;
import scala.Tuple2; import scala.Tuple2;
import java.util.*; import java.util.*;
...@@ -32,15 +34,23 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord ...@@ -32,15 +34,23 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
@Override @Override
public List<DataSourceEntity> necessarySourceList() { public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList(); List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive); result.add(dataSourceHiveOrder);
result.add(dataSourceHiveOrderItem);
return result; return result;
} }
@Override @Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) { public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request; TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD(); JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHiveOrder.getDatasetByEntId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
JavaRDD<Row>virtualOrderItemRdd=dataSourceHiveOrderItem.getDatasetByEntId(enterpriseId).select("virtual_order_id","ent_brand_id").javaRDD();
JavaRDD<Tuple2<TrdVirtualOrderBean, Optional<Iterable<String>>>>orderRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data))
.leftOuterJoin(virtualOrderItemRdd.mapToPair(row->Tuple2.apply(row.getLong(0),row.getString(1))).groupByKey())
.map(data->data._2());
consumeRecordRDD=statisticsTypeHandle(orderRdd,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId); int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{ JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment