Commit 97a58367 by guos

会员标签4.0

parent efb54af7
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/26
*/
public class TagUnitPriceFilter extends AbstractTagConsumFilter{
private static TagUnitPriceFilter instance;
public static TagUnitPriceFilter getInstance() {
if(null==instance){
instance=new TagUnitPriceFilter();
}
return instance;
}
private TagUnitPriceFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x, y)->x+y)
.mapPartitions(data->{
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(tp2._2().intValue()==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment