Commit e5176cf1 by guos

会员标签4.0

parent d9baaa45
package com.gic.spark.filter; package com.gic.spark.filter;
import com.alibaba.fastjson.JSONObject;
import com.gic.spark.datasource.entity.DataSourceEntity; import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager; import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdVirtualOrderBean; import com.gic.spark.entity.bean.TrdVirtualOrderBean;
...@@ -10,6 +11,7 @@ import org.apache.commons.lang.StringUtils; ...@@ -10,6 +11,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.json4s.jackson.Json;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -50,6 +52,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil ...@@ -50,6 +52,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
consumeRecordRDD=statisticsTypeHandle(orderRdd,channelRequest); consumeRecordRDD=statisticsTypeHandle(orderRdd,channelRequest);
System.out.println("channelRequest==>"+ JSONObject.toJSONString(channelRequest));
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time())) JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)) .mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{ .reduceByKey((x,y)->{
...@@ -60,6 +63,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil ...@@ -60,6 +63,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
return y; return y;
} }
}).filter(data->{ }).filter(data->{
System.out.println("data==>"+ JSONObject.toJSONString(data));
if(channelRequest.getChannelVals().contains(data._2().getOrder_channel_code())){ if(channelRequest.getChannelVals().contains(data._2().getOrder_channel_code())){
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment