Commit 3ded251e by 徐高华

删除同步中的重试

parent f13c402e
...@@ -813,7 +813,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -813,7 +813,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
public void taskStatusTimer(String params) { public void taskStatusTimer(String params) {
try { try {
RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_ID_SET_CACHE); RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_ID_SET_CACHE);
logger.info("处理好办任务状态={}",set); logger.info("好办任务状态开始={}",set);
Iterator<String> it = set.iterator() ; Iterator<String> it = set.iterator() ;
while(it.hasNext()) { while(it.hasNext()) {
String taskId = it.next(); String taskId = it.next();
...@@ -836,7 +836,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -836,7 +836,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
} }
} }
} }
logger.info("处理好办任务状态定时完成"); logger.info("好办任务状态完成={}",set);
}catch(Exception e) { }catch(Exception e) {
logger.warn(e.toString(),e); logger.warn(e.toString(),e);
} }
...@@ -846,7 +846,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -846,7 +846,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
public void staffTaskStatusTimer(String params) { public void staffTaskStatusTimer(String params) {
try { try {
RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_STAFF_ID_SET_CACHE); RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_STAFF_ID_SET_CACHE);
logger.info("处理好办任务状态={}",set); logger.info("好办任务状态开始staff={}",set);
Iterator<String> it = set.iterator() ; Iterator<String> it = set.iterator() ;
while(it.hasNext()) { while(it.hasNext()) {
String v = it.next() ; String v = it.next() ;
...@@ -875,7 +875,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -875,7 +875,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
it.remove(); it.remove();
} }
} }
logger.info("处理好办任务状态定时完成"); logger.info("好办任务状态完成staff={}",set);
}catch(Exception e) { }catch(Exception e) {
logger.warn(e.toString(),e); logger.warn(e.toString(),e);
} }
...@@ -884,22 +884,11 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -884,22 +884,11 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
private boolean updateTaskStatus(String taskId) { private boolean updateTaskStatus(String taskId) {
boolean b = preDealService.checkFriendTask(taskId, PreDealTypeEnum.friend_clerk.getVal()); boolean b = preDealService.checkFriendTask(taskId, PreDealTypeEnum.friend_clerk.getVal());
if (!b) { if (!b) {
logger.info("更新task状态,还未完成"); logger.info("更新task状态,还未完成={}",taskId);
return false; return false;
} }
logger.info("好友同步成功:{}", taskId); logger.info("整个task完成={}", taskId);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId); this.syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
if (syncTask.getStatusFlag() == SyncTaskStatusEnum.compute.getVal()
|| syncTask.getStatusFlag() == SyncTaskStatusEnum.exception_compute.getVal()) {
logger.info("已经处理成功:{}", taskId);
return true;
}
int errCount = preDealService.countByTaskId(taskId, -1, PreDealStatusEnum.exception.getVal());
if (errCount > 0) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.exception_compute.getVal());
} else {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
}
return true ; return true ;
} }
...@@ -907,24 +896,21 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -907,24 +896,21 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
if(type == 7) { if(type == 7) {
boolean b = preDealService.checkFriendTaskByPDataId(taskId, stffId, PreDealTypeEnum.self_friend.getVal()); boolean b = preDealService.checkFriendTaskByPDataId(taskId, stffId, PreDealTypeEnum.self_friend.getVal());
if (!b) { if (!b) {
logger.info("校验直接返回");
return false; return false;
} }
logger.info("同步第三方好友开始:{},{}", taskId, stffId); logger.info("代开发完成,开始第三方好友:{},{}", taskId, stffId);
String key = "haoban_sync_self_friend_task_" + taskId + ":" + stffId; String key = "haoban_sync_self_friend_task_" + taskId + ":" + stffId;
String lockKey = "haoban_sync_self_friend_task_lock_" + taskId + ":" + stffId; String lockKey = "haoban_sync_self_friend_task_lock_" + taskId + ":" + stffId;
RedisUtil.lock(lockKey, 3L); RedisUtil.lock(lockKey, 3L);
Object hasDealCheck = RedisUtil.getCache(key); Object hasDealCheck = RedisUtil.getCache(key);
if (hasDealCheck != null) { if (hasDealCheck != null) {
logger.info("重复提交处理第三方好友:{}", stffId);
RedisUtil.unlock(lockKey); RedisUtil.unlock(lockKey);
return false; return false;
} }
RedisUtil.setCache(key, stffId, 30L, TimeUnit.SECONDS); RedisUtil.setCache(key, stffId, 30L, TimeUnit.SECONDS);
List<String> dataIds = preDealService.listReDataIdByPDataId(taskId, stffId, PreDealTypeEnum.friend.getVal()); List<String> dataIds = preDealService.listReDataIdByPDataId(taskId, stffId, PreDealTypeEnum.friend.getVal());
if (CollectionUtils.isEmpty(dataIds)) { if (CollectionUtils.isEmpty(dataIds)) {
logger.info("第三方的数据为空"); logger.info("无第三方的数据");
//更新父级别
preDealService.updateStatusByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功"); preDealService.updateStatusByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
} else { } else {
dealDepartmentToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.friend_sync); dealDepartmentToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.friend_sync);
...@@ -938,22 +924,17 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -938,22 +924,17 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
} }
String staffLockKey = DealSyncOperationApiServiceImpl.FRIEND_LOCK + stffId; String staffLockKey = DealSyncOperationApiServiceImpl.FRIEND_LOCK + stffId;
RedisUtil.delCache(staffLockKey); RedisUtil.delCache(staffLockKey);
logger.info("同步第三方好友结束:{},{},{}", taskId, stffId,staffLockKey); logger.info("第三方好友结束:{},{},{}", taskId, stffId,staffLockKey);
String key = "haoban_sync_third_friend_task_" + taskId + ":" + stffId; String key = "haoban_sync_third_friend_task_" + taskId + ":" + stffId;
String lockKey = "haoban_sync_third_friend_task_lock_" + taskId + ":" + stffId; String lockKey = "haoban_sync_third_friend_task_lock_" + taskId + ":" + stffId;
RedisUtil.lock(lockKey, 3L); RedisUtil.lock(lockKey, 3L);
Object hasDealCheck = RedisUtil.getCache(key); Object hasDealCheck = RedisUtil.getCache(key);
if (hasDealCheck != null) { if (hasDealCheck != null) {
logger.info("重复提交结束第三方好友:{}", stffId);
RedisUtil.unlock(lockKey); RedisUtil.unlock(lockKey);
return false; return false;
} }
RedisUtil.setCache(key, stffId, 30L, TimeUnit.SECONDS); RedisUtil.setCache(key, stffId, 30L, TimeUnit.SECONDS);
TabHaobanPreDealLog preDealLog = preDealService.getByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.dealing.getVal()); this.preDealService.updateStatusByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
if (preDealLog != null) {
preDealService.updateStatusByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
}
RedisUtil.unlock(lockKey); RedisUtil.unlock(lockKey);
} }
return true ; return true ;
......
...@@ -76,15 +76,8 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation { ...@@ -76,15 +76,8 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
List<String> delUserIdList = selfUserIdList ; List<String> delUserIdList = selfUserIdList ;
logger.info("selfUserIdList={}", JSONObject.toJSONString(selfUserIdList)); logger.info("selfUserIdList={}", JSONObject.toJSONString(selfUserIdList));
if (CollectionUtils.isEmpty(selfUserIdList)) { if (CollectionUtils.isEmpty(selfUserIdList)) {
logger.info("自建应用调用第三方接口进入重试");
dealFlag = tryAgainToMq(dataPre);
reason = "重试次数过多";
return;
}
if (CollectionUtils.isEmpty(selfUserIdList)) {
logger.info("该用户无代开外部联系人:{}", JSONObject.toJSONString(dataPre));
dealFlag = false; dealFlag = false;
reason = "该用户没有外部联系人"; reason = "无外部联系人/接口失败";
return; return;
} }
String wxUserId3th = staff.getWxUserId() ; String wxUserId3th = staff.getWxUserId() ;
...@@ -93,19 +86,11 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation { ...@@ -93,19 +86,11 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
} }
List<String> userIdList = null ; List<String> userIdList = null ;
if(!qwDTO.isSelf()) { if(!qwDTO.isSelf()) {
//第三方服务商的外部联系人
userIdList = memberUnionidRelatedApiService.listExterialList(wxEnterpriseId, wxUserId3th); userIdList = memberUnionidRelatedApiService.listExterialList(wxEnterpriseId, wxUserId3th);
delUserIdList = userIdList ; delUserIdList = userIdList ;
if (CollectionUtils.isEmpty(selfUserIdList)) {
dealFlag = tryAgainToMq(dataPre);
reason = "重试次数过多";
logger.info("第三方服务商外部联系人调用失败进入重试:{}", dataPre.getDataId());
return;
}
if (CollectionUtils.isEmpty(userIdList)) { if (CollectionUtils.isEmpty(userIdList)) {
logger.info("该用户无第三方外部联系人:{}", JSONObject.toJSONString(dataPre));
dealFlag = false; dealFlag = false;
reason = "该用户没有外部联系人"; reason = "无外部联系人/接口失败";
return; return;
} }
} }
...@@ -157,31 +142,6 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation { ...@@ -157,31 +142,6 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
} }
} }
/**
* 重试机制
*
* @param dataPre
* @return
*/
private boolean tryAgainToMq(TabHaobanPreDealLog dataPre) {
logger.info("需要重试:{}", dataPre.getDataId());
String key = TRY_AGAIN + dataPre.getTaskId() + ":" + dataPre.getDataId();
Object cache = RedisUtil.getCache(key);
if (null == cache) {
RedisUtil.setCache(key, 1, 2L, TimeUnit.HOURS);
} else {
int count = Integer.parseInt(cache.toString());
RedisUtil.setCache(key, count + 1);
if (count > 4) {
return false;
}
}
HashSet<String> reTrysIds = new HashSet<>();
reTrysIds.add(dataPre.getDataId());
dealDepartmentToMq(dataPre.getTaskId(), reTrysIds, SyncTaskStatusEnum.friend_clerk_sync);
return true;
}
@Override @Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) { public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal(), reason); preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal(), reason);
......
...@@ -108,7 +108,6 @@ public class FriendSyncNewOperation implements BaseSyncOperation { ...@@ -108,7 +108,6 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
dealSuccess(taskId, dataPre.getDataId(), dataPre.getpDataId(), wxEnterpriseId); dealSuccess(taskId, dataPre.getDataId(), dataPre.getpDataId(), wxEnterpriseId);
} catch (WxApiLimitException e) { } catch (WxApiLimitException e) {
logger.info("接口次数限制:{}", JSONObject.toJSONString(dataPre)); logger.info("接口次数限制:{}", JSONObject.toJSONString(dataPre));
dealFlag = tryAgainToMq(dataPre);
reason = "接口重试超出限制"; reason = "接口重试超出限制";
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -122,31 +121,6 @@ public class FriendSyncNewOperation implements BaseSyncOperation { ...@@ -122,31 +121,6 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
} }
} }
/**
* 重试机制
*
* @param dataPre
* @return
*/
private boolean tryAgainToMq(TabHaobanPreDealLog dataPre) {
logger.info("需要重试friend:{}", dataPre.getDataId());
String key = TRY_AGAIN + dataPre.getTaskId() + ":" + dataPre.getDataId();
Object cache = RedisUtil.getCache(key);
if (null == cache) {
RedisUtil.setCache(key, 1, 2L, TimeUnit.HOURS);
} else {
Integer count = Integer.valueOf(cache.toString());
RedisUtil.setCache(key, count + 1);
if (count > 4) {
return false;
}
}
HashSet<String> reTrysIds = new HashSet<>();
reTrysIds.add(dataPre.getDataId());
dealDepartmentToMq(dataPre.getTaskId(), reTrysIds, SyncTaskStatusEnum.friend_sync);
return true;
}
public void dealException(String taskId, String dataId, String pDataId, String reason, String relationKey) { public void dealException(String taskId, String dataId, String pDataId, String reason, String relationKey) {
preDealService.updateFriendStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal(), reason, null, relationKey, PreDealTypeEnum.friend.getVal()); preDealService.updateFriendStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal(), reason, null, relationKey, PreDealTypeEnum.friend.getVal());
checkDepartmentTask(taskId, pDataId); checkDepartmentTask(taskId, pDataId);
...@@ -176,30 +150,4 @@ public class FriendSyncNewOperation implements BaseSyncOperation { ...@@ -176,30 +150,4 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
@Override @Override
public void checkDepartmentTask(String taskId) { public void checkDepartmentTask(String taskId) {
} }
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
List<String> ret = dealList.stream().map(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
return JSONObject.toJSONString(dealParamMqDTO);
}).collect(Collectors.toList());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendBatchMessages("departmentSyncDealMq", ret);
} catch (Exception e) {
logger.info("发送失败:{}", taskId, e);
e.printStackTrace();
}
}
} }
...@@ -96,7 +96,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation { ...@@ -96,7 +96,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
dealSuccess(taskId, dataPre.getDataId(), dataPre.getpDataId(), wxEnterpriseId); dealSuccess(taskId, dataPre.getDataId(), dataPre.getpDataId(), wxEnterpriseId);
} catch (WxApiLimitException e) { } catch (WxApiLimitException e) {
logger.info("接口次数限制:{}", JSONObject.toJSONString(dataPre)); logger.info("接口次数限制:{}", JSONObject.toJSONString(dataPre));
dealFlag = tryAgainToMq(dataPre);
reason = "getCorpSelfExternalUseridInfo重试次数过多"; reason = "getCorpSelfExternalUseridInfo重试次数过多";
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -122,30 +121,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation { ...@@ -122,30 +121,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
checkDepartmentTask(taskId, pDataId); checkDepartmentTask(taskId, pDataId);
} }
/**
* 重试机制
*
* @param dataPre
* @return
*/
private boolean tryAgainToMq(TabHaobanPreDealLog dataPre) {
logger.info("需要重试self:{}", dataPre.getDataId());
String key = TRY_AGAIN + dataPre.getTaskId() + ":" + dataPre.getDataId();
Object cache = RedisUtil.getCache(key);
if (null == cache) {
RedisUtil.setCache(key, 1, 2L, TimeUnit.HOURS);
} else {
Integer count = Integer.valueOf(cache.toString());
RedisUtil.setCache(key, count + 1);
if (count > 4) {
return false;
}
}
HashSet<String> reTrysIds = new HashSet<>();
reTrysIds.add(dataPre.getDataId());
dealDepartmentToMq(dataPre.getTaskId(), reTrysIds, SyncTaskStatusEnum.self_friend_sync);
return true;
}
/** /**
* 自定义处理流程 * 自定义处理流程
...@@ -160,32 +135,4 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation { ...@@ -160,32 +135,4 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
@Override @Override
public void checkDepartmentTask(String taskId) { public void checkDepartmentTask(String taskId) {
} }
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
List<String> ret = dealList.stream().map(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
return JSONObject.toJSONString(dealParamMqDTO);
}).collect(Collectors.toList());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
Log.info("发送队列SelfFriendSyncNewOperation={}",JSON.toJSONString(ret));
clientInstance.sendBatchMessages("departmentSyncDealMq", ret);
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId);
e.printStackTrace();
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment