Commit 3697474e by fudahua

feat: 队列处理

parent 9f99f981
...@@ -11,6 +11,12 @@ import java.util.List; ...@@ -11,6 +11,12 @@ import java.util.List;
*/ */
public interface IFlatQueryResultService { public interface IFlatQueryResultService {
/**
* mq处理
* @param params
*/
public void dealFileMq(String params) ;
/** 自助指标查询 /** 自助指标查询
* @param tableId 指定的表明 * @param tableId 指定的表明
* @param enterpriseIds 要查询的企业编号集合 * @param enterpriseIds 要查询的企业编号集合
......
...@@ -17,9 +17,11 @@ import com.gic.cloud.data.hook.service.dao.FlatQueryTableDao; ...@@ -17,9 +17,11 @@ import com.gic.cloud.data.hook.service.dao.FlatQueryTableDao;
import com.gic.cloud.data.hook.service.entity.ColumnInfo; import com.gic.cloud.data.hook.service.entity.ColumnInfo;
import com.gic.cloud.data.hook.service.entity.CsvDataFilterMode; import com.gic.cloud.data.hook.service.entity.CsvDataFilterMode;
import com.gic.cloud.data.hook.service.entity.CsvResultSetHelper; import com.gic.cloud.data.hook.service.entity.CsvResultSetHelper;
import com.gic.commons.util.GICMQClientUtil;
import com.gic.dubbo.entity.ProviderLocalTag; import com.gic.dubbo.entity.ProviderLocalTag;
import com.gic.enterprise.api.dto.EnterpriseDTO; import com.gic.enterprise.api.dto.EnterpriseDTO;
import com.gic.enterprise.api.service.EnterpriseService; import com.gic.enterprise.api.service.EnterpriseService;
import com.gic.mq.sdk.GicMQClient;
import com.gic.qcloud.FileRecordLogUtil; import com.gic.qcloud.FileRecordLogUtil;
import com.gic.redis.data.util.RedisUtil; import com.gic.redis.data.util.RedisUtil;
import com.gic.thirdparty.cloudfile.CloudFileUtil; import com.gic.thirdparty.cloudfile.CloudFileUtil;
...@@ -122,6 +124,13 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -122,6 +124,13 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
return zip.getOrgFileUrl(); return zip.getOrgFileUrl();
} }
@Override
public void dealFileMq(String params) {
logger.info("处理文件:{}",params);
DownloadTask downloadTask = JSON.parseObject(params, DownloadTask.class);
takeFileNew(downloadTask);
}
/** 生成自助指标查询最终 SQL /** 生成自助指标查询最终 SQL
* @param queryOrDownload 查询或下载用途 * @param queryOrDownload 查询或下载用途
* @param tableId * @param tableId
...@@ -675,21 +684,23 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -675,21 +684,23 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
try{ try{
String lockKey="data:hook:hive"; String lockKey="data:hook:hive";
RedisUtil.lock(lockKey,2L); RedisUtil.lock(lockKey,2L);
List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,3); List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,10);
if (CollectionUtils.isNotEmpty(downloadTasks)) { if (CollectionUtils.isNotEmpty(downloadTasks)) {
for (DownloadTask downloadTask : downloadTasks) { for (DownloadTask downloadTask : downloadTasks) {
downloadTask.setStatus(DownloadTaskStatus.BUILDING); downloadTask.setStatus(DownloadTaskStatus.BUILDING);
downloadTask.setDownloadTime(new Date()); downloadTask.setDownloadTime(new Date());
DownloadTaskServiceImpl.getInstance().updateDownloadTask(downloadTask); DownloadTaskServiceImpl.getInstance().updateDownloadTask(downloadTask);
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
clientInstance.sendMessage("dataHookFileDeal",JSONObject.toJSONString(downloadTask));
} }
} }
RedisUtil.unlock(lockKey); RedisUtil.unlock(lockKey);
downloadTasks.parallelStream().forEach(mid->{ // downloadTasks.parallelStream().forEach(mid->{
ProviderLocalTag localTag = ProviderLocalTag.tag.get(); // ProviderLocalTag localTag = ProviderLocalTag.tag.get();
localTag.traceId = traceId; // localTag.traceId = traceId;
//下载处理 // //下载处理
takeFileNew(mid); // takeFileNew(mid);
}); // });
}catch (Exception e){ }catch (Exception e){
logger.info("异常:{}",e); logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage()); logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment