Commit ec4bf1e3 by fudahua

队列处理逻辑变更

parent 243458e1
...@@ -73,6 +73,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -73,6 +73,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private static final Integer maxFields = 20; private static final Integer maxFields = 20;
private static final Integer SMALL_SIZE = 10;
private static final Map<String, String> bigTaskRunningMap = new ConcurrentHashMap<>(); private static final Map<String, String> bigTaskRunningMap = new ConcurrentHashMap<>();
@Autowired @Autowired
...@@ -89,6 +91,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -89,6 +91,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private FlatQueryResultServiceImpl() { private FlatQueryResultServiceImpl() {
log.debug("construct", "准备初始化 FlatQuery 查询服务"); log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDistTask(3);
runDownloadTask(3); runDownloadTask(3);
runBalaDownloadTask(3); runBalaDownloadTask(3);
runBigDataDownloadTask(3); runBigDataDownloadTask(3);
...@@ -589,8 +592,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -589,8 +592,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
continue; continue;
} }
} }
if (CollectionUtils.isEmpty(smallConditions)||smallConditions.size()<10) { //push进小队列
if (CollectionUtils.isEmpty(smallConditions)||smallConditions.size()<SMALL_SIZE) {
smallConditions.add(taskConditions.get(i)); smallConditions.add(taskConditions.get(i));
taskConditions.remove(i);
} }
} // IF OVER } // IF OVER
} // FOR OVER } // FOR OVER
...@@ -621,27 +626,16 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -621,27 +626,16 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
providerLocalTag.traceId = UUID.randomUUID().toString(); providerLocalTag.traceId = UUID.randomUUID().toString();
Connection connection = null; Connection connection = null;
try{ try{
if (taskConditions != null && taskConditions.size() > 0) { if (smallConditions != null && smallConditions.size() > 0) {
FlatQueryTaskCondition condition = null; FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件 //FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<taskConditions.size(); i++ ) { for (int i=0; i<smallConditions.size(); i++ ) {
logger.info("[ 自助指标当前正在执行的任务为:]:{}", JSON.toJSONString(taskConditions.get(i))); logger.info("[ 自助指标当前正在执行的任务为:]:{}", JSON.toJSONString(smallConditions.get(i)));
if (taskConditions.get(i).getBuildPermitted().equals(Global.YES) if (smallConditions.get(i).getBuildPermitted().equals(Global.YES)
&& !taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) { && !smallConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) {
if(taskConditions.get(i).getAllFields().size() >= maxFields) {
bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i);
continue;
} else {
if(bigTaskRunningMap.isEmpty()){
bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i);
continue;
}
}
try { try {
connection = HiveHelper.getDownloadHiveConnection(); connection = HiveHelper.getDownloadHiveConnection();
condition = taskConditions.remove(i); // 移除并获取第一个任务条件 condition = smallConditions.remove(i); // 移除并获取第一个任务条件
break; break;
}catch (Exception e){ }catch (Exception e){
logger.info("[ 获取连接异常: ]:{}", e.getMessage()); logger.info("[ 获取连接异常: ]:{}", e.getMessage());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment