Commit a2ccc65c by fudahua

feat: 正在处理的

parent 82add547
...@@ -76,4 +76,9 @@ public interface DownloadTaskDao { ...@@ -76,4 +76,9 @@ public interface DownloadTaskDao {
*/ */
public int getCountDownloadTaskOfBuilding(@Param("queryDataSource") String queryDataSource); public int getCountDownloadTaskOfBuilding(@Param("queryDataSource") String queryDataSource);
/** 获取等待申请通过状态的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfBuilding(@Param("queryDataSource") String queryDataSource);
} }
...@@ -242,6 +242,13 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService { ...@@ -242,6 +242,13 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService {
return downloadTaskDao.getCountDownloadTaskOfBuilding(queryDataSource); return downloadTaskDao.getCountDownloadTaskOfBuilding(queryDataSource);
} }
/** 获取在审核申请等待状态中的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfBuilding(String queryDataSource) {
return downloadTaskDao.getDownloadTaskOfBuilding(queryDataSource);
}
/** 获取指定申请编号的风险模式记录 /** 获取指定申请编号的风险模式记录
* @param applyId * @param applyId
......
...@@ -75,6 +75,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -75,6 +75,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public static final String HDFS_URL = "/data/hook"; public static final String HDFS_URL = "/data/hook";
public static final String LOCK_KEY="data:hook:hive";
public static final List<String> PHONE = Arrays.asList("mobile", "phone", "phone_number", "receive_phone_number"); public static final List<String> PHONE = Arrays.asList("mobile", "phone", "phone_number", "receive_phone_number");
...@@ -111,9 +113,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -111,9 +113,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
log.debug("construct", "准备初始化 FlatQuery 查询服务"); log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDealHiveFile(3); runDealHiveFile(3);
// runDistTask(3); // runDistTask(3);
runDownloadTask(3); // runDownloadTask(3);
runBalaDownloadTask(3); // runBalaDownloadTask(3);
runBigDataDownloadTask(3); // runBigDataDownloadTask(3);
runApplyTask(5); // 每5秒钟进行任务状态检测 runApplyTask(5); // 每5秒钟进行任务状态检测
} }
...@@ -128,6 +130,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -128,6 +130,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public void dealFileMq(String params) { public void dealFileMq(String params) {
logger.info("处理文件:{}",params); logger.info("处理文件:{}",params);
DownloadTask downloadTask = JSON.parseObject(params, DownloadTask.class); DownloadTask downloadTask = JSON.parseObject(params, DownloadTask.class);
String key=LOCK_KEY+":"+downloadTask.getId();
RedisUtil.delCache(key);
takeFileNew(downloadTask); takeFileNew(downloadTask);
} }
...@@ -582,28 +586,48 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -582,28 +586,48 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
@Override @Override
@PostConstruct @PostConstruct
public void initTask(){ public void initTask(){
RedisUtil.lock(LOCK_KEY,3L);
try { try {
List<DownloadRecord> list = this.downloadTaskService.listUnDownloadTask(QueryDataSource.FLAT_QUERY); // List<DownloadRecord> list = this.downloadTaskService.listUnDownloadTask(QueryDataSource.FLAT_QUERY);
list = DataInitUtils.listByHost(list); // list = DataInitUtils.listByHost(list);
log.debug("flatInitList", JSON.toJSONString(list)); // log.debug("flatInitList", JSON.toJSONString(list));
if(CollectionUtils.isNotEmpty(list)){ // if(CollectionUtils.isNotEmpty(list)){
for(DownloadRecord record : list){ // for(DownloadRecord record : list){
if(StringUtils.isNotBlank(record.getDownloadCondition())){ // if(StringUtils.isNotBlank(record.getDownloadCondition())){
FlatQueryTaskCondition condition = JSON.parseObject(record.getDownloadCondition(), FlatQueryTaskCondition.class); // FlatQueryTaskCondition condition = JSON.parseObject(record.getDownloadCondition(), FlatQueryTaskCondition.class);
if(record.getApplyStatus().equals(DownloadApplyStatus.TIMEOUT)){ // if(record.getApplyStatus().equals(DownloadApplyStatus.TIMEOUT)){
continue; // continue;
} // }
if(condition.getBuildPermitted().equals(Global.NO)){ // if(condition.getBuildPermitted().equals(Global.NO)){
if(record.getApplyPermitted().equals(Global.YES)){ // if(record.getApplyPermitted().equals(Global.YES)){
condition.setBuildPermitted(Global.YES); // condition.setBuildPermitted(Global.YES);
} // }
} // }
this.taskConditions.add(condition); // this.taskConditions.add(condition);
} //
// }
// }
// }
//之前正在处理的队列需要重新处理
List<DownloadTask> tasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfBuilding(QueryDataSource.FLAT_QUERY);
logger.info("初始化任务:{}",JSONObject.toJSONString(tasks));
for (DownloadTask task : tasks) {
String key=LOCK_KEY+":"+task.getId();
Object cache = RedisUtil.getCache(key);
if (cache!=null) {
continue;
} }
delFileOrDirByTaskId(task.getId());
//重新处理
task.setStatus(DownloadTaskStatus.WAITING);
task.setDownloadTime(new Date());
DownloadTaskServiceImpl.getInstance().updateDownloadTask(task);
} }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
}finally {
RedisUtil.unlock(LOCK_KEY);
} }
} }
...@@ -683,17 +707,17 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -683,17 +707,17 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
logger.info("执行hivefile"); logger.info("执行hivefile");
try{ try{
String lockKey="data:hook:hive"; RedisUtil.lock(LOCK_KEY,3L);
RedisUtil.lock(lockKey,3L);
int curBuildingCount = DownloadTaskServiceImpl.getInstance().getCountDownloadTaskOfBuilding(QueryDataSource.FLAT_QUERY); int curBuildingCount = DownloadTaskServiceImpl.getInstance().getCountDownloadTaskOfBuilding(QueryDataSource.FLAT_QUERY);
Config appConfig = ConfigService.getAppConfig(); Config appConfig = ConfigService.getAppConfig();
Integer maxBuildingCount = appConfig.getIntProperty("buildingCount", 5); Integer maxBuildingCount = appConfig.getIntProperty("buildingCount", 5);
if (maxBuildingCount<=curBuildingCount) { if (maxBuildingCount<=curBuildingCount) {
RedisUtil.unlock(lockKey); logger.info("超出限制:{}>{}",curBuildingCount,maxBuildingCount);
RedisUtil.unlock(LOCK_KEY);
return; return;
} }
List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,10); List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,5);
if (CollectionUtils.isNotEmpty(downloadTasks)) { if (CollectionUtils.isNotEmpty(downloadTasks)) {
for (DownloadTask downloadTask : downloadTasks) { for (DownloadTask downloadTask : downloadTasks) {
downloadTask.setStatus(DownloadTaskStatus.BUILDING); downloadTask.setStatus(DownloadTaskStatus.BUILDING);
...@@ -701,15 +725,11 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -701,15 +725,11 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
DownloadTaskServiceImpl.getInstance().updateDownloadTask(downloadTask); DownloadTaskServiceImpl.getInstance().updateDownloadTask(downloadTask);
GicMQClient clientInstance = GICMQClientUtil.getClientInstance(); GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
clientInstance.sendMessage("dataHookFileDeal",JSONObject.toJSONString(downloadTask)); clientInstance.sendMessage("dataHookFileDeal",JSONObject.toJSONString(downloadTask));
String key=LOCK_KEY+":"+downloadTask.getId();
RedisUtil.setCache(key,1,12*60*60L);
} }
} }
RedisUtil.unlock(lockKey); RedisUtil.unlock(LOCK_KEY);
// downloadTasks.parallelStream().forEach(mid->{
// ProviderLocalTag localTag = ProviderLocalTag.tag.get();
// localTag.traceId = traceId;
// //下载处理
// takeFileNew(mid);
// });
}catch (Exception e){ }catch (Exception e){
logger.info("异常:{}",e); logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage()); logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
...@@ -905,7 +925,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -905,7 +925,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
boolean downloadFlag = HDFSUtil.getInstance().downloadFile(path, SAVE_FOLDER); boolean downloadFlag = HDFSUtil.getInstance().downloadFile(path, SAVE_FOLDER);
if (!downloadFlag) { if (!downloadFlag) {
logger.info("下载失败:{}-{}",path,JSONObject.toJSONString(task)); logger.info("下载失败:{}-{}",path,JSONObject.toJSONString(task));
return; throw new RuntimeException("下载失败:"+condition.getTaskId());
} }
stopWatch.stop(); stopWatch.stop();
logger.info("下载耗时:{}",stopWatch.getLastTaskTimeMillis()); logger.info("下载耗时:{}",stopWatch.getLastTaskTimeMillis());
...@@ -1312,6 +1332,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService { ...@@ -1312,6 +1332,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
} }
}catch (Exception e) { }catch (Exception e) {
logger.info("异常:{}",e); logger.info("异常:{}",e);
throw new RuntimeException(e);
} }
} }
......
...@@ -196,6 +196,7 @@ ...@@ -196,6 +196,7 @@
WHERE WHERE
q.query_data_source = #{queryDataSource} q.query_data_source = #{queryDataSource}
AND q.status = "downloading" AND q.status = "downloading"
AND q.apply_permitted = 1
AND q.del_flag = '0' AND q.del_flag = '0'
order by start_time asc limit ${num} order by start_time asc limit ${num}
</select> </select>
...@@ -231,4 +232,16 @@ ...@@ -231,4 +232,16 @@
AND q.del_flag = '0' AND q.del_flag = '0'
</select> </select>
<select id="getDownloadTaskOfBuilding" resultType="DownloadTask">
SELECT *
FROM
<include refid="queryTables"/>
<include refid="queryJoins"/>
WHERE
q.query_data_source = #{queryDataSource}
AND q.status = "building"
AND q.del_flag = '0'
order by start_time asc
</select>
</mapper> </mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment