Commit 0b79fde3 by fudahua

队列处理逻辑变更

队列处理逻辑变更

队列处理逻辑变更

下载逻辑的日志修改
parent b1b58b00
...@@ -2,6 +2,7 @@ package com.gic.cloud.data.hook.service.impl; ...@@ -2,6 +2,7 @@ package com.gic.cloud.data.hook.service.impl;
import cn.medubi.client.utils.LogPak; import cn.medubi.client.utils.LogPak;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService; import com.ctrip.framework.apollo.ConfigService;
import com.gic.cloud.common.api.base.Page; import com.gic.cloud.common.api.base.Page;
...@@ -73,6 +74,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -73,6 +74,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private static final Integer maxFields = 20; private static final Integer maxFields = 20;
private static final Integer SMALL_SIZE = 10;
private static final Map<String, String> bigTaskRunningMap = new ConcurrentHashMap<>(); private static final Map<String, String> bigTaskRunningMap = new ConcurrentHashMap<>();
@Autowired @Autowired
...@@ -85,9 +88,11 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -85,9 +88,11 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
/** 自助指标查询关联的下载条件列表 */ /** 自助指标查询关联的下载条件列表 */
protected List<FlatQueryTaskCondition> taskConditions = Lists.newArrayList(); protected List<FlatQueryTaskCondition> taskConditions = Lists.newArrayList();
protected List<FlatQueryTaskCondition> bigTaskConditions = Lists.newArrayList(); protected List<FlatQueryTaskCondition> bigTaskConditions = Lists.newArrayList();
protected List<FlatQueryTaskCondition> smallConditions = Lists.newArrayList();
private FlatQueryResultServiceImpl() { private FlatQueryResultServiceImpl() {
log.debug("construct", "准备初始化 FlatQuery 查询服务"); log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDistTask(3);
runDownloadTask(3); runDownloadTask(3);
runBalaDownloadTask(3); runBalaDownloadTask(3);
runBigDataDownloadTask(3); runBigDataDownloadTask(3);
...@@ -557,45 +562,87 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -557,45 +562,87 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private static SimpleDateFormat timeFormatter = new SimpleDateFormat("HH:mm:ss"); private static SimpleDateFormat timeFormatter = new SimpleDateFormat("HH:mm:ss");
/** 任务分配 */
/** 下载任务执行计时器 */
//private Timer downloadTaskTimer = new Timer(); //private Timer downloadTaskTimer = new Timer();
ScheduledExecutorService downloadService = new ScheduledThreadPoolExecutor(1, ScheduledExecutorService distService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("downloadTimer-%d").daemon(true).build()); new BasicThreadFactory.Builder().namingPattern("distTimer-%d").daemon(true).build());
/** 启动自助指标查询计划任务 */ /** 启动分配任务任务 */
private void runDownloadTask(Integer interval) { private void runDistTask(Integer interval) {
downloadService.scheduleAtFixedRate(new Runnable() { downloadService.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get(); ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get();
providerLocalTag.traceId = UUID.randomUUID().toString(); providerLocalTag.traceId = UUID.randomUUID().toString();
Connection connection = null;
try{ try{
if (taskConditions != null && taskConditions.size() > 0) { if (taskConditions != null && taskConditions.size() > 0) {
FlatQueryTaskCondition condition = null; FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件 //FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<taskConditions.size(); i++ ) { for (int i=0; i<taskConditions.size(); i++ ) {
log.debug("自助指标当前正在执行的任务为:", JSON.toJSONString(taskConditions.get(i))); logger.info("[ 自助指标当前正在执行的任务为:]:{}", JSON.toJSONString(taskConditions.get(i)));
if (taskConditions.get(i).getBuildPermitted().equals(Global.YES) if (taskConditions.get(i).getBuildPermitted().equals(Global.YES)
&& !taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) { && !taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) {
if(taskConditions.get(i).getAllFields().size() >= maxFields) { if(taskConditions.get(i).getAllFields().size() >= maxFields) {
logger.info("加入相应队列-大数据队列,字段多:{}", JSONObject.toJSONString(taskConditions.get(i)));
bigTaskConditions.add(taskConditions.get(i)); bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i); taskConditions.remove(i);
continue; continue;
} else { } else {
if(bigTaskRunningMap.isEmpty()){ if(bigTaskRunningMap.isEmpty()&&CollectionUtils.isEmpty(bigTaskConditions)){
logger.info("加入相应队列-大数据队列:{}", JSONObject.toJSONString(taskConditions.get(i)));
bigTaskConditions.add(taskConditions.get(i)); bigTaskConditions.add(taskConditions.get(i));
taskConditions.remove(i); taskConditions.remove(i);
continue; continue;
} }
} }
//push进小队列
if (CollectionUtils.isEmpty(smallConditions)||smallConditions.size()<SMALL_SIZE) {
logger.info("加入相应队列-小数据队列:{}", JSONObject.toJSONString(taskConditions.get(i)));
smallConditions.add(taskConditions.get(i));
taskConditions.remove(i);
}
} // IF OVER
} // FOR OVER
} // 没有任务则忽略
}catch (Exception e){
logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
e.printStackTrace();
} finally {
}
}
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
/** 下载任务执行计时器 */
//private Timer downloadTaskTimer = new Timer();
ScheduledExecutorService downloadService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("downloadTimer-%d").daemon(true).build());
/** 启动自助指标查询计划任务 */
private void runDownloadTask(Integer interval) {
downloadService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get();
providerLocalTag.traceId = UUID.randomUUID().toString();
Connection connection = null;
try{
if (smallConditions != null && smallConditions.size() > 0) {
FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<smallConditions.size(); i++ ) {
logger.info("[ 自助指标当前正在执行的任务为:]:{}", JSON.toJSONString(smallConditions.get(i)));
if (smallConditions.get(i).getBuildPermitted().equals(Global.YES)
&& !smallConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) {
try { try {
connection = HiveHelper.getDownloadHiveConnection(); connection = HiveHelper.getDownloadHiveConnection();
condition = taskConditions.remove(i); // 移除并获取第一个任务条件 condition = smallConditions.remove(i); // 移除并获取第一个任务条件
break; break;
}catch (Exception e){ }catch (Exception e){
log.debug("获取连接异常:", e.getMessage()); logger.info("[ 获取连接异常: ]:{}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
continue; continue;
} }
...@@ -604,7 +651,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -604,7 +651,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
takeFile(condition, connection); takeFile(condition, connection);
} // 没有任务则忽略 } // 没有任务则忽略
}catch (Exception e){ }catch (Exception e){
log.debug("自助指标下载异常", e.getMessage()); logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
if(connection != null){ if(connection != null){
...@@ -637,7 +685,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -637,7 +685,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
FlatQueryTaskCondition condition = null; FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件 //FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<bigTaskConditions.size(); i++ ) { for (int i=0; i<bigTaskConditions.size(); i++ ) {
log.debug("自助指标当前正在执行的任务为:", JSON.toJSONString(bigTaskConditions.get(i))); logger.info("[ 自助指标当前正在执行的任务为:]: {}", JSON.toJSONString(bigTaskConditions.get(i)));
if (bigTaskConditions.get(i).getBuildPermitted().equals(Global.YES)) { if (bigTaskConditions.get(i).getBuildPermitted().equals(Global.YES)) {
try{ try{
condition = bigTaskConditions.remove(i); // 移除并获取第一个任务条件 condition = bigTaskConditions.remove(i); // 移除并获取第一个任务条件
...@@ -645,7 +693,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -645,7 +693,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
connection = HiveHelper.getBigDataDownloadHiveConnection(); connection = HiveHelper.getBigDataDownloadHiveConnection();
break; break;
}catch (Exception e){ }catch (Exception e){
log.debug("获取连接异常:", e.getMessage()); logger.info("异常:{}",e);
logger.info("[ 获取连接异常: ]: {}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
continue; continue;
} }
...@@ -656,7 +705,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -656,7 +705,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
} // 没有任务则忽略 } // 没有任务则忽略
}catch (Exception e){ }catch (Exception e){
logger.info("下载异常:{}",e); logger.info("下载异常:{}",e);
log.debug("自助指标下载异常", e.getMessage()); logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
if(connection != null){ if(connection != null){
...@@ -689,7 +738,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -689,7 +738,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
FlatQueryTaskCondition condition = null; FlatQueryTaskCondition condition = null;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件 //FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for (int i=0; i<taskConditions.size(); i++ ) { for (int i=0; i<taskConditions.size(); i++ ) {
log.debug("自助指标当前正在执行的任务为:", JSON.toJSONString(taskConditions.get(i))); logger.info("[ 自助指标当前正在执行的任务为:]: {}", JSON.toJSONString(taskConditions.get(i)));
if (taskConditions.get(i).getBuildPermitted().equals(Global.YES) if (taskConditions.get(i).getBuildPermitted().equals(Global.YES)
&& taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) { && taskConditions.get(i).getEnterpriseIds().contains("ff8080816dd0385e016ddca436d01fe1")) {
try{ try{
...@@ -697,7 +746,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -697,7 +746,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
condition = taskConditions.remove(i); // 移除并获取第一个任务条件 condition = taskConditions.remove(i); // 移除并获取第一个任务条件
break; break;
}catch (Exception e){ }catch (Exception e){
log.debug("获取连接异常:", e.getMessage()); logger.info("[ 获取连接异常: ]: {}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
continue; continue;
} }
...@@ -708,7 +757,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -708,7 +757,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
takeFile(condition, connection); takeFile(condition, connection);
} // 没有任务则忽略 } // 没有任务则忽略
}catch (Exception e){ }catch (Exception e){
log.debug("自助指标下载异常", e.getMessage()); logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
if(connection != null){ if(connection != null){
...@@ -729,8 +779,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -729,8 +779,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
DownloadTask task = DownloadTaskServiceImpl.getInstance().getDownloadTaskById(condition.getTaskId()); DownloadTask task = DownloadTaskServiceImpl.getInstance().getDownloadTaskById(condition.getTaskId());
task.setStatus(DownloadTaskStatus.BUILDING); task.setStatus(DownloadTaskStatus.BUILDING);
DownloadTaskServiceImpl.getInstance().updateDownloadTask(task); DownloadTaskServiceImpl.getInstance().updateDownloadTask(task);
logger.info("test================="); logger.info("[ runDownloadTask.run ]: {}", "自助指标下载任务执行:" + task.getId());
log.debug("runDownloadTask.run", "自助指标下载任务执行:" + task.getId());
//初始化校验 //初始化校验
FilterFieldUtils.timeFieldCheckInit(condition.getConditions()); FilterFieldUtils.timeFieldCheckInit(condition.getConditions());
...@@ -745,8 +794,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -745,8 +794,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
0, 0,
condition.getAuthStoreIdList()); condition.getAuthStoreIdList());
log.debug("runDownloadTask.run", "获取商户连接:" + task.getId()); logger.info("[ runDownloadTask.run ]: {}", "获取商户连接:" + task.getId());
log.debug("sql-", task.getId() + "-" + fullQuery); logger.info("[ sql- ]: {}", task.getId() + "-" + fullQuery);
if (conn != null) { if (conn != null) {
try { try {
Statement stat = conn.createStatement(); Statement stat = conn.createStatement();
...@@ -759,7 +808,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -759,7 +808,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
// 生成指定格式下载元文件 // 生成指定格式下载元文件
String originalFilePath = ""; String originalFilePath = "";
if (task.getFormat().equals(DownloadFileFormat.CSV)) { // 如果指定为 CSV 格式 if (task.getFormat().equals(DownloadFileFormat.CSV)) { // 如果指定为 CSV 格式
log.debug("runDownloadTask.run", "准备生成自助指标下载文件 " + condition.getTaskId() + ".csv"); logger.info("[ runDownloadTask.run ]: {}", "准备生成自助指标下载文件 " + condition.getTaskId() + ".csv");
originalFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".csv"; originalFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".csv";
File tmp = new File(originalFilePath); File tmp = new File(originalFilePath);
if (tmp.exists()) { // 删除可能存在的文件 if (tmp.exists()) { // 删除可能存在的文件
...@@ -772,9 +821,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -772,9 +821,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
writer.writeAll(rs, true); writer.writeAll(rs, true);
writer.close(); writer.close();
out.close();//记得关闭资源 out.close();//记得关闭资源
log.debug("runDownloadTask.run", "已生成自助指标下载文件 " + condition.getTaskId() + ".csv"); logger.info("[ runDownloadTask.run ]: {}", "已生成自助指标下载文件 " + condition.getTaskId() + ".csv");
} else { // 如果指定为 XLS 格式 } else { // 如果指定为 XLS 格式
log.debug("runDownloadTask.run", "准备生成自助指标下载文件 " + condition.getTaskId() + ".xlsx"); logger.info("[ runDownloadTask.run ]: {}", "准备生成自助指标下载文件 " + condition.getTaskId() + ".xlsx");
originalFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".xlsx"; originalFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".xlsx";
SXSSFWorkbook wb = new SXSSFWorkbook(100); // 内存中保留 100 行 SXSSFWorkbook wb = new SXSSFWorkbook(100); // 内存中保留 100 行
Sheet sheet = wb.createSheet(); Sheet sheet = wb.createSheet();
...@@ -801,14 +850,14 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -801,14 +850,14 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
fileOut.close(); fileOut.close();
//wb.close(); //wb.close();
wb.dispose(); // SXSSFWorkbook 没有 close 方法 wb.dispose(); // SXSSFWorkbook 没有 close 方法
log.debug("runDownloadTask.run", "已生成自助指标下载文件 " + condition.getTaskId() + ".xlsx"); logger.info("[ runDownloadTask.run ]: {}", "已生成自助指标下载文件 " + condition.getTaskId() + ".xlsx");
} // IF ELSE OVER } // IF ELSE OVER
String cloudFileUrl = "https://"; String cloudFileUrl = "https://";
// 如果指定压缩,则使用之 // 如果指定压缩,则使用之
//if (task.getFormat().equals("zip")) { //if (task.getFormat().equals("zip")) {
String taskFileExt = task.getUseCompress().equals(Global.YES) ? ".zip" : task.getFormat().equals(DownloadFileFormat.CSV) ? ".csv" : ".xlsx"; String taskFileExt = task.getUseCompress().equals(Global.YES) ? ".zip" : task.getFormat().equals(DownloadFileFormat.CSV) ? ".csv" : ".xlsx";
if (task.getUseCompress().equals(Global.YES)) { if (task.getUseCompress().equals(Global.YES)) {
log.debug("runDownloadTask.run", "准备生成自助指标压缩文件 " + condition.getTaskId() + ".zip"); logger.info("[ runDownloadTask.run ]: {}", "准备生成自助指标压缩文件 " + condition.getTaskId() + ".zip");
String zipFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".zip"; String zipFilePath = SAVE_FOLDER + "/" + condition.getTaskId() + ".zip";
File zipFile = new File(zipFilePath); File zipFile = new File(zipFilePath);
ZipOutputStream zos = null; ZipOutputStream zos = null;
...@@ -829,21 +878,21 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -829,21 +878,21 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
is.close(); is.close();
//bos.close(); //bos.close();
//os.close(); //os.close();
log.debug("runDownloadTask.run", "已生成自助指标压缩文件 " + condition.getTaskId() + ".zip"); logger.info("[ runDownloadTask.run ]: {}", "已生成自助指标压缩文件 " + condition.getTaskId() + ".zip");
} catch (Exception ex2) { } catch (Exception ex2) {
throw ex2; throw ex2;
} finally { } finally {
zos.closeEntry(); zos.closeEntry();
zos.close(); zos.close();
} }
log.debug("开始上传压缩文件到腾讯云", task.getId()); logger.info("[ 开始上传压缩文件到腾讯云 ]: {}", task.getId());
cloudFileUrl += FileUploadUtil.simpleUploadFileFromLocal(zipFile, task.getName() + "-" + task.getId()+taskFileExt, BucketNameEnum.COMPRESS_60000.getName()); cloudFileUrl += FileUploadUtil.simpleUploadFileFromLocal(zipFile, task.getName() + "-" + task.getId()+taskFileExt, BucketNameEnum.COMPRESS_60000.getName());
} else { } else {
log.debug("开始上传文件到腾讯云", task.getId()); logger.info("[ 开始上传文件到腾讯云 ]: {}", task.getId());
cloudFileUrl += FileUploadUtil.simpleUploadFileFromLocal(new File(originalFilePath), task.getName() + "-" + task.getId()+taskFileExt, BucketNameEnum.REPORT_50000.getName()); cloudFileUrl += FileUploadUtil.simpleUploadFileFromLocal(new File(originalFilePath), task.getName() + "-" + task.getId()+taskFileExt, BucketNameEnum.REPORT_50000.getName());
} }
log.debug("上传腾讯云", "地址为:"+cloudFileUrl); logger.info("[ 上传腾讯云 ]: {}", "地址为:"+cloudFileUrl);
task.setStatus(DownloadTaskStatus.COMPLISHED); task.setStatus(DownloadTaskStatus.COMPLISHED);
task.setOverTime(new Date()); task.setOverTime(new Date());
task.setFilePath(cloudFileUrl); task.setFilePath(cloudFileUrl);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment