Commit 32863a84 by guos

混合标签处理

parent 53178bd6
...@@ -12,6 +12,7 @@ import java.util.List; ...@@ -12,6 +12,7 @@ import java.util.List;
public class TagProcessEntity { public class TagProcessEntity {
int enterpriseId; int enterpriseId;
long tagGroupId; long tagGroupId;
Integer realTime;
int level; int level;
List<TagConditionDTO> tagList; List<TagConditionDTO> tagList;
} }
...@@ -36,6 +36,7 @@ import org.elasticsearch.spark.EsRddFactory; ...@@ -36,6 +36,7 @@ import org.elasticsearch.spark.EsRddFactory;
import org.elasticsearch.spark.cfg.SparkSettings; import org.elasticsearch.spark.cfg.SparkSettings;
import org.elasticsearch.spark.cfg.SparkSettingsManager; import org.elasticsearch.spark.cfg.SparkSettingsManager;
import org.elasticsearch.spark.rdd.EsRDDWriter; import org.elasticsearch.spark.rdd.EsRDDWriter;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2; import scala.Tuple2;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
...@@ -100,8 +101,18 @@ public class TagProcessManager { ...@@ -100,8 +101,18 @@ public class TagProcessManager {
List<TabSceneCrowd> sceneCrowdList=member4RddManager.getPojo("tab_scene_crowd", TabSceneCrowd.class,null) List<TabSceneCrowd> sceneCrowdList=member4RddManager.getPojo("tab_scene_crowd", TabSceneCrowd.class,null)
.filter(new Column("delete_flag").equalTo(0)) .filter(new Column("delete_flag").equalTo(0))
.filter(new Column("valid_flag").equalTo(1)) .filter(new Column("valid_flag").equalTo(1))
.filter(new Column("real_time").equalTo(2)) .javaRDD()
.collectAsList(); .filter(data->{
if(null!=data.getReal_Time()){
if(data.getReal_Time()==2){
return true;
}else if(data.getReal_Time()==3){
return true;
}
}
return false;
})
.collect();
for(TabSceneCrowd sceneCrowd:sceneCrowdList){ for(TabSceneCrowd sceneCrowd:sceneCrowdList){
LinkedList<TagConditionGroupDTO>conditionGroupDTOList=JSONObject.parseObject(sceneCrowd.getTag_Condition_Group_Info(), new TypeReference<LinkedList<TagConditionGroupDTO>>(){}); LinkedList<TagConditionGroupDTO>conditionGroupDTOList=JSONObject.parseObject(sceneCrowd.getTag_Condition_Group_Info(), new TypeReference<LinkedList<TagConditionGroupDTO>>(){});
sceneCrowdDTOList.add(new SceneCrowdDTO(sceneCrowd,conditionGroupDTOList)); sceneCrowdDTOList.add(new SceneCrowdDTO(sceneCrowd,conditionGroupDTOList));
...@@ -133,30 +144,22 @@ public class TagProcessManager { ...@@ -133,30 +144,22 @@ public class TagProcessManager {
Map<Long, BaseTagFilter> tagIdToFilterMap = new HashMap(); Map<Long, BaseTagFilter> tagIdToFilterMap = new HashMap();
for(SceneCrowdDTO sceneCrowdDTO:sceneCrowdDTOList){ for(SceneCrowdDTO sceneCrowdDTO:sceneCrowdDTOList){
if(sceneCrowdDTO.getReal_Time()==2){
LinkedList<TagConditionGroupDTO> conditionGroupDTOS=sceneCrowdDTO.getConditionGroupDTOList(); LinkedList<TagConditionGroupDTO> conditionGroupDTOS=sceneCrowdDTO.getConditionGroupDTOList();
for(int i=0;i<conditionGroupDTOS.size();i++){ for(int i=0;i<conditionGroupDTOS.size();i++){
TagProcessEntity entity=new TagProcessEntity(); TagProcessEntity entity=new TagProcessEntity();
entity.enterpriseId=sceneCrowdDTO.getEnterprise_Id(); entity.enterpriseId=sceneCrowdDTO.getEnterprise_Id();
entity.tagGroupId=sceneCrowdDTO.getId(); entity.tagGroupId=sceneCrowdDTO.getId();
entity.realTime=sceneCrowdDTO.getReal_Time();
entity.level=i+1; entity.level=i+1;
entity.tagList=conditionGroupDTOS.get(i).getConditionInfos(); entity.tagList=conditionGroupDTOS.get(i).getConditionInfos();
for(TagConditionDTO conditionDTO:entity.tagList){//将tag同filter进行映射 for(TagConditionDTO conditionDTO:entity.tagList){//将tag同filter进行映射
if(conditionDTO.getRealTime()==0){
BaseTagFilter tagFilter=TagFilterFactory.getInstance().getTagFilter(conditionDTO); BaseTagFilter tagFilter=TagFilterFactory.getInstance().getTagFilter(conditionDTO);
if(null!=tagFilter){ if(null!=tagFilter){
tagIdToFilterMap.put(conditionDTO.getTagId(),tagFilter); tagIdToFilterMap.put(conditionDTO.getTagId(),tagFilter);
/*for(DataSourceEntity sourceEntity:tagFilter.necessarySourceList()){
System.out.println("enterprise_id==>"+entity.enterpriseId);
if(null!=sourceEntity){
System.out.println("tagEsFieldName==>"+conditionDTO.getTagEsFieldName());
// System.out.println("SourceKey==>"+sourceEntity.getSourceKey());
// System.out.println("HiveTableName==>"+sourceEntity.getHiveTableName());
}else{
System.out.println("tagEsFieldName==>"+conditionDTO.getTagEsFieldName()+" ,sourceEntity==>"+sourceEntity);
} }
}*/
} }
} }
...@@ -167,7 +170,6 @@ public class TagProcessManager { ...@@ -167,7 +170,6 @@ public class TagProcessManager {
tagGroupByEnterpriseMap.get(sceneCrowdDTO.getEnterprise_Id()).add(entity); tagGroupByEnterpriseMap.get(sceneCrowdDTO.getEnterprise_Id()).add(entity);
} }
} }
}
//准备标签需要的数据 //准备标签需要的数据
...@@ -194,10 +196,13 @@ public class TagProcessManager { ...@@ -194,10 +196,13 @@ public class TagProcessManager {
} }
//处理标签数据 //处理标签数据
JavaSparkContext jsc = SparkEnvManager.getInstance().getJsc();
List<Long>sceneCrowdIdList=new ArrayList(); List<Long>sceneCrowdIdList=new ArrayList();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) { for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Integer enterpriseId=enterpriseTagEntry.getKey(); Integer enterpriseId=enterpriseTagEntry.getKey();
List<JavaPairRDD<Long, String>> filterRddList = new ArrayList(); String indexName = EsRequestUtil.getESIindexName(enterpriseId,this.isProduction());
JavaPairRDD<Long,String>memberGroupRdd=null;
for(TagProcessEntity entity:enterpriseTagEntry.getValue()){ for(TagProcessEntity entity:enterpriseTagEntry.getValue()){
for(TagConditionDTO conditionDTO:entity.tagList){ for(TagConditionDTO conditionDTO:entity.tagList){
if(tagIdToFilterMap.containsKey(conditionDTO.getTagId())){ if(tagIdToFilterMap.containsKey(conditionDTO.getTagId())){
...@@ -206,36 +211,32 @@ public class TagProcessManager { ...@@ -206,36 +211,32 @@ public class TagProcessManager {
final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level; final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
JavaPairRDD<Long, String> rdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId)); JavaPairRDD<Long, String> filterRdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId));
filterRddList.add(rdd);
sceneCrowdIdList.add(entity.tagGroupId);
}
}
}
if(filterRddList.size()>0){
JavaPairRDD<Long,String>memberGroupRdd=null;
for(JavaPairRDD<Long,String>rdd:filterRddList){
if(null==memberGroupRdd){ if(null==memberGroupRdd){
memberGroupRdd=rdd; memberGroupRdd=filterRdd;
}else{ }else{
memberGroupRdd=memberGroupRdd.union(rdd); memberGroupRdd=memberGroupRdd.union(filterRdd);
} }
sceneCrowdIdList.add(entity.tagGroupId);
} }
}
}
if(null!=memberGroupRdd){
JavaPairRDD<Long,Long>userRdd=memberSharding4Datasource.getDatasetByEnterpriseId(enterpriseId).select("id").javaRDD() JavaPairRDD<Long,Long>userRdd=memberSharding4Datasource.getDatasetByEnterpriseId(enterpriseId).select("id").javaRDD()
.mapToPair(data->Tuple2.apply((Long)data.getAs("id"),(Long)data.getAs("id"))) .mapToPair(data->Tuple2.apply((Long)data.getAs("id"),(Long)data.getAs("id")))
.reduceByKey((x,y)->x); .reduceByKey((x,y)->x);
JavaPairRDD<Long,String>updateMemberGroupRdd=userRdd.leftOuterJoin(memberGroupRdd.groupByKey()) JavaPairRDD<Long,String>updateMemberGroupRdd=userRdd.leftOuterJoin(memberGroupRdd.reduceByKey((x,y)->x+" "+y))
.mapPartitionsToPair(data->{ .mapPartitionsToPair(data->{
List<Tuple2<Long,String>> result=new ArrayList(); List<Tuple2<Long,String>> result=new ArrayList();
while (data.hasNext()){ while (data.hasNext()){
Tuple2<Long, Optional<Iterable<String>>> tp2=data.next()._2(); Tuple2<Long, Optional<String>> tp2=data.next()._2();
Long id=tp2._1(); Long id=tp2._1();
String groupId=tp2._2().isPresent()? Joiner.on(" ").join(Lists.newArrayList(tp2._2().get())):"";
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("id",id); json.put("id",id);
json.put("sceneTags_b",groupId); json.put("sceneTags_b",tp2._2().isPresent()?tp2._2().get():"");
result.add(Tuple2.apply(id,json.toString())); result.add(Tuple2.apply(id,json.toString()));
} }
return result.iterator(); return result.iterator();
...@@ -245,7 +246,7 @@ public class TagProcessManager { ...@@ -245,7 +246,7 @@ public class TagProcessManager {
System.out.println("updateMemberGroupRdd==>"+updateMemberGroupRdd.count()); System.out.println("updateMemberGroupRdd==>"+updateMemberGroupRdd.count());
JavaPairRDD<Long,String>cacheMemberGroupRdd=updateMemberGroupRdd.cache(); JavaPairRDD<Long,String>cacheMemberGroupRdd=updateMemberGroupRdd.cache();
updateIndex(cacheMemberGroupRdd,enterpriseId); updateIndex(cacheMemberGroupRdd,indexName);
saveToHive(cacheMemberGroupRdd,enterpriseId); saveToHive(cacheMemberGroupRdd,enterpriseId);
...@@ -253,10 +254,68 @@ public class TagProcessManager { ...@@ -253,10 +254,68 @@ public class TagProcessManager {
updateSceneCrowd(sceneCrowdIdList); updateSceneCrowd(sceneCrowdIdList);
} }
//处理混合标签
JavaPairRDD<Long, String> searchRDD=null;
for(TagProcessEntity mixEntity:enterpriseTagEntry.getValue()){
if(mixEntity.realTime==3){
Long tagGroupId=mixEntity.tagGroupId;
String query=EsRequestUtil.getIndexParam(enterpriseId,tagGroupId,this.isProduction);
if(StringUtils.isNotEmpty(query)){
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.query", query);
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB=tagGroupId.toString();
if(null!=data._2().get("sceneTags_b")){
sceneTagsB=sceneTagsB+" "+data._2().get("sceneTags_b");
} }
return Tuple2.apply((Long) data._2.get("id"),sceneTagsB);
});
if(null==searchRDD){
searchRDD=esRdd;
}else{
searchRDD=searchRDD.union(esRdd);
} }
}
}
}
if(null!=searchRDD){
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x,y)->x+" "+y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD,indexName);
}
}
}
private void saveToHive(JavaPairRDD<Long, String> updateMemberGroup,Integer enterpriseId){ private void saveToHive(JavaPairRDD<Long, String> updateMemberGroup,Integer enterpriseId){
JavaRDD<TagGroupInfo> tagGroupInfoRDD=updateMemberGroup.mapPartitions(data->{ JavaRDD<TagGroupInfo> tagGroupInfoRDD=updateMemberGroup.mapPartitions(data->{
...@@ -278,10 +337,9 @@ public class TagProcessManager { ...@@ -278,10 +337,9 @@ public class TagProcessManager {
} }
private void updateIndex(JavaPairRDD<Long, String> updateMemberGroup, Integer enterpriseId) { private void updateIndex(JavaPairRDD<Long, String> updateMemberGroup, String indexName) {
Map<String, String> conf = new HashMap(); Map<String, String> conf = new HashMap();
String indexName = EsRequestUtil.getESIindexName(enterpriseId,this.isProduction());
System.out.println("indexName===>" + indexName); System.out.println("indexName===>" + indexName);
conf.put("es.resource", indexName + "/mapper_type"); conf.put("es.resource", indexName + "/mapper_type");
conf.put("es.nodes", AppEnvUtil.ES_NODES); conf.put("es.nodes", AppEnvUtil.ES_NODES);
...@@ -338,4 +396,64 @@ public class TagProcessManager { ...@@ -338,4 +396,64 @@ public class TagProcessManager {
} }
} }
} }
private void mixTagProcess(Map<Integer, List<TagProcessEntity>> tagGroupByEnterpriseMap){
//处理混合标签
JavaSparkContext jsc = SparkEnvManager.getInstance().getJsc();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseMixTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Integer enterpriseId = enterpriseMixTagEntry.getKey();
String indexName = EsRequestUtil.getESIindexName(enterpriseId,this.isProduction());
JavaPairRDD<Long, String> searchRDD=null;
for(TagProcessEntity mixEntity:enterpriseMixTagEntry.getValue()){
Long tagGroupId=mixEntity.tagGroupId;
Map<String, String> conf = new HashMap();
conf.put("es.nodes", AppEnvUtil.ES_NODES);
conf.put("es.resource", indexName + "/mapper_type");
// conf.put("es.query", query.toString());
conf.put("es.scroll.size", "5000");
JavaPairRDD<Long, String> esRdd = JavaEsSpark.esRDD(jsc, conf)
.mapToPair(data -> {
String sceneTagsB=tagGroupId.toString();
if(null!=data._2().get("sceneTags_b")){
sceneTagsB=sceneTagsB+" "+data._2().get("sceneTags_b");
}
return Tuple2.apply((Long) data._2.get("id"),sceneTagsB);
});
if(null==searchRDD){
searchRDD=esRdd;
}else{
searchRDD=searchRDD.union(esRdd);
}
}
if(null!=searchRDD){
JavaPairRDD<Long, String> groupRDD = searchRDD.repartition(100).reduceByKey((x,y)->x+" "+y)
.mapPartitionsToPair(data -> {
List<Tuple2<Long, String>> list = new ArrayList();
while (data.hasNext()) {
Set<String> set = new HashSet();
Tuple2<Long, String> tp2 = data.next();
String[] tagGroups = tp2._2().split(" ");
for (String tagGroup : tagGroups) {
set.add(tagGroup);
}
JSONObject json = new JSONObject();
json.put("id", tp2._1());
json.put("sceneTags_b", Joiner.on(" ").join(set));
list.add(Tuple2.apply(tp2._1(), json.toString()));
}
return list.iterator();
});
updateIndex(groupRDD,indexName);
}
}
}
} }
...@@ -2,7 +2,11 @@ package com.gic.spark.util; ...@@ -2,7 +2,11 @@ package com.gic.spark.util;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
...@@ -15,7 +19,7 @@ public class EsRequestUtil { ...@@ -15,7 +19,7 @@ public class EsRequestUtil {
public static String getESIindexName(Integer enterpriseId,boolean isProduction) { public static String getESIindexName(Integer enterpriseId,boolean isProduction) {
String url = isProduction?"https://ideal.demogic.com/member-config/member/index-version/" + enterpriseId String url = isProduction?"https://ideal.demogic.com/member-config/member/index-version/" + enterpriseId
:"https://four.gicdev.com/member-config/member/index-version/" + enterpriseId; :"https://four.gicdev.com/member-config/member/index-version/" + enterpriseId;
HttpResponse response = HttpClient.getHttpResponseByGet(url); HttpResponse response = getHttpResponseByGet(url,enterpriseId);
int responseCode = response.getStatusLine().getStatusCode(); int responseCode = response.getStatusLine().getStatusCode();
if (responseCode == 200) { if (responseCode == 200) {
return HttpClient.getResponseString(response); return HttpClient.getResponseString(response);
...@@ -23,4 +27,50 @@ public class EsRequestUtil { ...@@ -23,4 +27,50 @@ public class EsRequestUtil {
return null; return null;
} }
public static HttpResponse getHttpResponseByGet(String url,Integer enterpriseId) {
org.apache.http.client.HttpClient httpClient = HttpClient.getHttpClient();
HttpGet get = new HttpGet(url);
get.setHeader("sign",""+enterpriseId);
get.setHeader("route","/spark");
get.setHeader("project","memberfour");
get.setHeader("isControl","true");
get.setHeader("Refer",url);
try {
HttpResponse response = httpClient.execute(get);
return response;
} catch (ClientProtocolException var4) {
var4.printStackTrace();
} catch (IOException var5) {
var5.printStackTrace();
}
return null;
}
public static String getIndexParam( Integer enterpriseId,Long id , boolean isProd) {
String url = isProd?"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="+id+"&enterpriseId="+enterpriseId
:"https://four.gicdev.com/member-label/get-member-size-by-condition?sceneCrowdId="+id+"&enterpriseId="+enterpriseId;
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.addHeader("project","member-tag");
httpPost.addHeader("sign","1129");
httpPost.addHeader("route","get-member-size-by-condition");
HttpResponse response = null;
try {
response = HttpClient.getHttpClient().execute(httpPost);
JSONObject json = JSONObject.parseObject(HttpClient.getResponseString(response));
return json.getJSONObject("result").getJSONObject("result").toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) throws IOException {
System.out.println(getIndexParam(1129,183676455182233606L,true));
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment