Commit 74e5adda by 徐高华

Merge branch 'feature/订单储值' into 'master'

Feature/订单储值

See merge request !1421
parents a88f761e 3476421d
......@@ -6,8 +6,10 @@ import com.gic.haoban.manage.api.dto.PreDealLogInfoDTO;
import com.gic.haoban.manage.api.dto.SyncCheckDTO;
import com.gic.haoban.manage.api.dto.SyncSingleDealDTO;
import com.gic.haoban.manage.api.dto.SyncTaskDTO;
import com.gic.haoban.manage.api.enums.SyncTaskStatusEnum;
import java.util.List;
import java.util.Set;
/**
* Created 2020/4/9.
......@@ -209,7 +211,11 @@ public interface DealSyncOperationApiService {
* @param taskId
*/
void dealTagTask(String wxEnterpriseId, String enterpriseId, List<String> tagIds, String taskId);
// 同步好友状态定时
void taskStatusTimer(String params) ;
void staffTaskStatusTimer(String params) ;
// 同步通讯录状态定时
void taskSyncStaffTimer(String params) ;
void sendToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum , String mqName) ;
}
......@@ -266,12 +266,14 @@ public class TestController extends WebBaseController {
}
@RequestMapping("/get-cache")
@ResponseBody
public Object getCache(String key) {
Object o = RedisUtil.getCache(key) ;
return o ;
}
@RequestMapping("/del-cache")
@ResponseBody
public Object delCache(String key) {
Object o = RedisUtil.getCache(key) ;
if(null != o) {
......
......@@ -57,16 +57,6 @@ public interface PreDealLogMapper {
, @Param("status") int status, @Param("dataType") int dataType);
/**
* 获取单条数据
*
* @param taskId
* @param pDataId
* @return
*/
List<TabHaobanPreDealLog> listReByPDataId(@Param("taskId") String taskId, @Param("dataId") String pDataId
, @Param("dataType") int dataType);
/**
* 更新状态
*
* @param taskId
......
......@@ -40,15 +40,6 @@ public interface PreDealService {
public List<TabHaobanPreDealLog> listByPDataId(String taskId, String pDataId, int status);
/**
* 根据父数据id 获取子节点
*
* @param pDataId
* @param status
* @return
*/
public List<TabHaobanPreDealLog> listReByPDataId(String taskId, String pDataId);
/**
* 更新单条数据状态
*
* @param dataId
......@@ -201,5 +192,6 @@ public interface PreDealService {
public void addTaskStatusCache(String taskId) ;
public void addTaskStaffStatusCache(String taskId,String staffId , int type) ;
public void addTaskDepartStatusCache(String taskId,int type) ;
}
......@@ -9,6 +9,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.gic.haoban.manage.api.service.DealSyncOperationApiService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -74,6 +75,8 @@ public class HmClerkRelationServiceImpl implements HmClerkRelationService {
private SyncTaskService syncTaskService;
@Autowired
private PreDealService preDealService;
@Autowired
private DealSyncOperationApiService dealSyncOperationApiService ;
@Override
......@@ -284,30 +287,12 @@ public class HmClerkRelationServiceImpl implements HmClerkRelationService {
return dealLog;
}).collect(Collectors.toList());
this.preDealService.insert(dealLogList);
this.sendToMq(taskId, dealLogList.stream().map(o->o.getDataId()).collect(Collectors.toSet()), SyncTaskStatusEnum.modify_hm);
this.dealSyncOperationApiService.sendToMq(taskId, dealLogList.stream().map(o->o.getDataId()).collect(Collectors.toSet()), SyncTaskStatusEnum.modify_hm,"departmentSyncDealMq2");
ServiceResponse resp = ServiceResponse.success() ;
resp.setMessage(taskId);
return resp ;
}
private void sendToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
List<String> listRet = dealList.stream().map(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
return JSONObject.toJSONString(dealParamMqDTO);
}).collect(Collectors.toList());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendBatchMessages("departmentSyncDealMq", listRet, 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public String createModifyHmTask(String wxEnterpriseId, String taskName, String staffId, String staffName, int syncTaskType) {
TabHaobanSyncTask tabHaobanSyncTask = new TabHaobanSyncTask();
......
......@@ -33,6 +33,7 @@ public class PreDealServiceImpl implements PreDealService {
public static final String HAOBAN_TASK_ID_SET_CACHE = "haobanTaskSyncIdSetCache" ;
public static final String HAOBAN_TASK_STAFF_ID_SET_CACHE = "haobanTaskStaffSyncIdSetCache" ;
public static final String HAOBAN_TASK_DEPART_SET_CACHE = "haobanTaskDepartSyncIdSetCache" ;
@Autowired
private PreDealLogMapper preDealLogMapper;
......@@ -53,7 +54,7 @@ public class PreDealServiceImpl implements PreDealService {
}
int i = 0;
while (i < pre) {
logger.info("这是第{}次,进入", i);
logger.info("分批保存第{}次", i);
i++;
int fromIndex = (i - 1) * pageSize;
int toIndex = (count - fromIndex) > pageSize ? (fromIndex + pageSize) : count;
......@@ -78,10 +79,6 @@ public class PreDealServiceImpl implements PreDealService {
return preDealLogMapper.listByPDataId(taskId, pDataId, status, PreDealTypeEnum.dept.getVal());
}
@Override
public List<TabHaobanPreDealLog> listReByPDataId(String taskId, String pDataId) {
return preDealLogMapper.listReByPDataId(taskId, pDataId, PreDealTypeEnum.dept.getVal());
}
@Override
public boolean updateStatusByDataId(String taskId, String dataId, int status, String reason) {
......@@ -200,4 +197,11 @@ public class PreDealServiceImpl implements PreDealService {
RSet<String> set = RedisUtil.getRedisClient().getSet(HAOBAN_TASK_STAFF_ID_SET_CACHE);
set.add(key) ;
}
@Override
public void addTaskDepartStatusCache(String taskId, int type) {
String key = taskId+"#"+type ;
RSet<String> set = RedisUtil.getRedisClient().getSet(HAOBAN_TASK_DEPART_SET_CACHE);
set.add(key) ;
}
}
......@@ -204,7 +204,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
logger.info("处理开始:类型:{}", syncTaskStatusEnum.getVal());
//处理预处理数据
preDealService.updateExceptionToPre(taskId);
dealDepartmentToMq(taskId, midList, syncTaskStatusEnum);
sendToMq(taskId, midList, syncTaskStatusEnum,null);
}
......@@ -213,7 +213,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
RedisUtil.lock(key, 10L);
String taskLock = getTaskLock(wxEnterpriseId);
if (StringUtils.isNotBlank(taskLock)) {
logger.info("【同步锁】taskLock={}", taskLock);
logger.info("在处理中,返回");
RedisUtil.unlock(key);
return null;
}
......@@ -340,10 +340,10 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
private void dealDepartmentInit(String taskId, String wxEnterpriseId, List<SyncSingleDealDTO> addList) {
Set<String> addListMid = addList.stream().filter(dto -> dto.getStoreFlag().equals(0)).map(dto -> dto.getRelatedId()).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(addListMid)) {
dealDepartmentToMq(taskId, addListMid, SyncTaskStatusEnum.group_sync);
sendToMq(taskId, addListMid, SyncTaskStatusEnum.group_sync,null);
} else {
addListMid = addList.stream().filter(dto -> dto.getStoreFlag().equals(1)).map(dto -> dto.getRelatedId()).collect(Collectors.toSet());
dealDepartmentToMq(taskId, addListMid, SyncTaskStatusEnum.store_sync);
sendToMq(taskId, addListMid, SyncTaskStatusEnum.store_sync,null);
}
}
......@@ -351,28 +351,9 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
* @param
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
List<String> listRet = dealList.stream().map(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
return JSONObject.toJSONString(dealParamMqDTO);
}).collect(Collectors.toList());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
logger.info("放入mq={}",JSON.toJSONString(listRet));
clientInstance.sendBatchMessages("departmentSyncDealMq", listRet, 10);
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId, JSONObject.toJSONString(listRet));
e.printStackTrace();
}
}
public String lockTask(String wxEnterpriseId, String taskId) {
......@@ -412,6 +393,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
@Override
public String getTaskLock(String wxEnterpriseId) {
String key = LOCK_KEY + wxEnterpriseId;
logger.info("缓存key={}",key);
return (String) RedisUtil.getCache(key);
}
......@@ -557,11 +539,12 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
}
List<com.gic.wechat.api.dto.qywx.DepartmentDTO> list = qywxDepartmentApiService.listSelfDepartment(qwDTO.getDkCorpid(), secretSetting.getSecretVal(), 1,qwDTO.getUrlHost());
if (CollectionUtils.isEmpty(list)) {
logger.info("没有数据同步");
logger.info("企微部门数量0,退出同步");
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.exception_close.getVal());
unlockTask(wxEnterpriseId);
return "没有数据同步或权限没设置全部";
}
logger.info("企微部门数={}",list.size());
List<TabHaobanPreDealLog> dealLogList = list.stream().map(dto -> {
TabHaobanPreDealLog dealLog = new TabHaobanPreDealLog();
dealLog.setDataId(dto.getId().toString());
......@@ -573,11 +556,11 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
dealLog.setWxEnterpriseId(wxEnterpriseId);
return dealLog;
}).collect(Collectors.toList());
// 保存部门数据
preDealService.insert(dealLogList);
HashSet<String> hashSet = new HashSet<>();
hashSet.add("1");
dealDepartmentToMq(taskId, hashSet, SyncTaskStatusEnum.group_sync);
sendToMq(taskId, hashSet, SyncTaskStatusEnum.group_sync,"departmentSyncDealMq2");
return null;
}
......@@ -647,7 +630,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
preDealService.insert(dealLogList);
HashSet<String> hashSet = (HashSet<String>) dealLogList.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealFriendToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync);
sendToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync,"friendSyncDealMq");
}
@Override
......@@ -692,7 +675,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
dealLogList.add(dealLog);
preDealService.insert(dealLogList);
HashSet<String> hashSet = (HashSet<String>) dealLogList.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync);
sendToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync,null);
}
@Override
......@@ -715,7 +698,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
}).collect(Collectors.toList());
preDealService.insert(dealLogList);
Set<String> hashSet = dealLogList.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync);
sendToMq(taskId, hashSet, SyncTaskStatusEnum.friend_clerk_sync,null);
}
@Override
......@@ -727,16 +710,16 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
}
}
}
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealFriendToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
@Override
public void sendToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum , String mqName) {
//预处理分组任务
if(StringUtils.isEmpty(mqName)) {
mqName = "departmentSyncDealMq" ;
}
// 通讯录同步不要更新状态
if(!(syncTaskStatusEnum.equals(SyncTaskStatusEnum.clerk_sync) || syncTaskStatusEnum.equals(SyncTaskStatusEnum.group_sync))) {
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
}
List<String> listRet = dealList.stream().map(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
......@@ -744,11 +727,14 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
return JSONObject.toJSONString(dealParamMqDTO);
}).collect(Collectors.toList());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendBatchMessages("friendSyncDealMq", listRet);
logger.info("放入mq={},{}",mqName,JSON.toJSONString(listRet));
clientInstance.sendBatchMessages(mqName, listRet, 1);
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId, JSONObject.toJSONString(listRet), e);
logger.info("发送失败:{},{}", taskId, JSONObject.toJSONString(listRet));
e.printStackTrace();
}
}
......@@ -803,7 +789,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
}).collect(Collectors.toList());
preDealService.insert(dealLogList);
Set<String> hashSet = dealLogList.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, hashSet, SyncTaskStatusEnum.tag);
sendToMq(taskId, hashSet, SyncTaskStatusEnum.tag,null);
}
/**
......@@ -881,6 +867,77 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
}
}
@Override
public void taskSyncStaffTimer(String params) {
try {
RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_DEPART_SET_CACHE);
logger.info("通讯录同步定时开始size={},{}",set.size(),set);
Iterator<String> it = set.iterator() ;
while(it.hasNext()) {
String v = it.next() ;
String[] arr = v.split("#") ;
if(arr.length != 2) {
logger.info("异常{},{}",v );
it.remove();
continue;
}
String taskId = arr[0] ;
int type = Integer.valueOf(arr[1]) ;
TabHaobanSyncTask task = syncTaskService.getSyncTask(taskId);
if(null == task) {
logger.info("任务不存在taskId={}",taskId);
it.remove();
continue;
}
if(task.getStatusFlag() != 4) {
this.syncTaskService.updateTaskStatus(taskId,4) ;
}
if(task.getCreateTime().getTime()+1000*60*60*3< System.currentTimeMillis()) {
logger.info("超过3小时taskId={},{}",taskId);
it.remove();
continue;
}
boolean flag = this.syncDepartmentStaffTask(taskId,type) ;
if(flag) {
it.remove();
}
}
logger.info("通讯录同步定时完成={}",set);
}catch(Exception e) {
logger.warn(e.toString(),e);
}
}
private boolean syncDepartmentStaffTask(String taskId,int type) {
boolean flag = preDealService.checkTask(taskId, type);
if (!flag) {
logger.info("同步通讯录部门还未完成={}",taskId);
return false;
}
if(type==0) {
logger.info("部门成功,开始同步成员:{}", taskId);
String key = "haoban_sync_department_task_" + taskId;
RedisUtil.lock(key, 3L);
// 部门和成员拉取成功后,直接任务完成
List<TabHaobanPreDealLog> dealLogs = preDealService.listByTaskId(taskId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.pre.getVal());
if (CollectionUtils.isNotEmpty(dealLogs)) {
Set<String> dataIds = dealLogs.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
this.sendToMq(taskId, dataIds, SyncTaskStatusEnum.clerk_sync,"departmentSyncDealMq2");
}
RedisUtil.unlock(key);
}else if(type==2) {
logger.info("企微通讯录成员成功:{}", taskId);
String key = "haoban_sync_clerk_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
String wxEnterpriseId = syncTask.getWxEnterpriseId();
this.unlockTask(wxEnterpriseId) ;
logger.info("同步通讯录完成");
RedisUtil.unlock(key);
}
return true ;
}
private boolean updateTaskStatus(String taskId) {
boolean b = preDealService.checkFriendTask(taskId, PreDealTypeEnum.friend_clerk.getVal());
if (!b) {
......@@ -913,7 +970,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
logger.info("无第三方的数据/自建");
preDealService.updateStatusByDataId(taskId, stffId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
} else {
dealDepartmentToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.friend_sync);
sendToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.friend_sync,null);
}
RedisUtil.unlock(lockKey);
}
......
......@@ -888,6 +888,13 @@ public class HmQrcodeApiServiceImpl implements HmQrcodeApiService {
hmQrcodeQDTO.setStoreId(staffClerkRelationDTO.getStoreId());
hmQrcodeQDTO.setClerkIdList(Collections.singletonList(staffClerkRelationDTO.getClerkId()));
add(hmQrcodeQDTO);
HmQrcodeBO hmQrcodeBO = hmQrcodeService.queryByClerkId(staffClerkRelationDTO.getClerkId(), staffClerkRelationDTO.getWxEnterpriseId());
if (hmQrcodeBO != null && hmQrcodeBO.getStatusFlag()==3) {
logger.info("导购活码待生效状态变更,clerkId:{}",staffClerkRelationDTO.getClerkId());
hmQrcodeService.updateStatusById(hmQrcodeBO.getHmId(), 1);
}
}
List<StaffClerkRelationDTO> relationDTOS = staffClerkRelationService.listIdsByNotInWxUserIds(wxUserIdsList, wxEnterpriseId, enterpriseId);
......
......@@ -4,7 +4,10 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.gic.haoban.manage.api.service.DealSyncOperationApiService;
import com.gic.wechat.api.service.qywx.QywxDepartmentApiService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -59,11 +62,13 @@ public class QywxDeptSyncOperation implements BaseSyncOperation {
@Autowired
private WxEnterpriseService wxEnterpriseService;
@Autowired
private QywxStaffSyncOperation qywxClerkSyncOperation;
@Autowired
private SecretSettingService secretSettingService;
@Autowired
private Config config;
@Autowired
private QywxDepartmentApiService qywxDepartmentApiService;
@Autowired
private DealSyncOperationApiService dealSyncOperationApiService;
@Override
public void dealSingleByMq(DealParamMqDTO dealParamMqDTO, TabHaobanPreDealLog dataPre) {
......@@ -77,24 +82,33 @@ public class QywxDeptSyncOperation implements BaseSyncOperation {
parent.setChainId("0");
parent.setChainName("");
}
if (parent == null) {
logger.info("不存在该父部门:c:{},p:{}", dataPre.getDataId(), dataPre.getpDataId());
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "父部不存在");
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "父部不存在");
return;
}
TabHaobanDepartment department = departmentService.getByWxId(dealParamMqDTO.getData(), dataPre.getWxEnterpriseId());
DepartmentDTO dto = new DepartmentDTO();
if (department != null) {
logger.info("已经同步过了:{}", dataPre.getDataId());
logger.info("部门记录存在,qwid={},hbid={}", dataPre.getDataId(),department.getDepartmentId());
dto = EntityUtil.changeEntityByJSON(DepartmentDTO.class, department);
}
logger.info("分组是否存在:{}", JSON.toJSONString(department));
String wxEnterpriseId = dataPre.getWxEnterpriseId() ;
String enterpriseId = dataPre.getEnterpriseId() ;
String dataId = dataPre.getDataId() ;
String taskId = dataPre.getTaskId() ;
WxEnterpriseQwDTO qwDTO = this.wxEnterpriseService.getQwInfo(wxEnterpriseId) ;
SecretSettingDTO secretSetting = secretSettingService.getSecretSetting(wxEnterpriseId, SecretTypeEnum.CUSTOMIZED_APP.getVal());
String dataContent = dataPre.getDataContent();
com.gic.wechat.api.dto.qywx.DepartmentDTO qywxDepart = JSONObject.parseObject(dataContent, com.gic.wechat.api.dto.qywx.DepartmentDTO.class);
// com.gic.wechat.api.dto.qywx.DepartmentDTO qywxDepart = this.qywxDepartmentApiService.getSelfDepartmentById(qwDTO.getDkCorpid(), secretSetting.getSecretVal(), Integer.valueOf(dataId),qwDTO.getUrlHost()) ;
if(null == qywxDepart) {
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "查询企微部门异常");
return;
}
boolean resultFlag = true;
try {
dto.setDepartmentName(qywxDepart.getName());
dto.setLevel(parent.getLevel() + 1);
......@@ -116,61 +130,53 @@ public class QywxDeptSyncOperation implements BaseSyncOperation {
} else {
this.departmentService.edit(dto);
}
this.preDealService.addTaskDepartStatusCache(taskId,0);
reason = this.getStaff(taskId,dataId,enterpriseId,wxEnterpriseId,secretSetting,qwDTO);
if(StringUtils.isNotBlank(reason)) {
resultFlag = false ;
}
} catch (Exception e) {
resultFlag = false;
reason = "处理异常:";
e.printStackTrace();
reason = "处理异常";
logger.info("处理异常:{}", e);
} finally {
if (!resultFlag) {
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), reason);
this.dealSyncOperationApiService.unlockTask(wxEnterpriseId) ;
} else {
dealSuccess(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), dataPre.getWxEnterpriseId());
}
}
}
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
logger.info("部门同步异常:{},{}", taskId, dataId);
preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.dept.getVal(), PreDealStatusEnum.exception.getVal(), reason);
logger.info("递归调用更新状态start==:{},d:{}", taskId, dataId);
changeChildStatus(taskId, dataId);
logger.info("递归调用更新状态end==:{},d:{}", taskId, dataId);
checkDepartmentTask(taskId);
this.preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.dept.getVal(), PreDealStatusEnum.exception.getVal(), reason);
}
/**
* 改变子状态
*
* @param taskId
* @param dataId
*/
private void changeChildStatus(String taskId, String dataId) {
List<TabHaobanPreDealLog> child = preDealService.listByPDataId(taskId, dataId, PreDealTypeEnum.dept.getVal());
if (CollectionUtils.isEmpty(child)) {
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.dept.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
if (!b) {
logger.info("成功修改错误!{}", dataId);
return;
}
child.forEach(tab -> {
preDealService.updateStatusByDataId(taskId, tab.getDataId(), PreDealTypeEnum.dept.getVal(), PreDealStatusEnum.exception.getVal(), "父分组同步异常");
changeChildStatus(taskId, tab.getDataId());
});
List<TabHaobanPreDealLog> preList = preDealService.listByPDataId(taskId, dataId,0);
if (CollectionUtils.isNotEmpty(preList)) {
Set<String> mid = preList.stream().map(tab -> tab.getDataId()).collect(Collectors.toSet());
this.dealSyncOperationApiService.sendToMq(taskId,mid,SyncTaskStatusEnum.group_sync,"departmentSyncDealMq2");
}
}
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
SecretSettingDTO secretSetting = secretSettingService.getSecretSetting(wxEnterpriseId, SecretTypeEnum.CUSTOMIZED_APP.getVal());
if (null == secretSetting || secretSetting.getCheckFlag() == 0) {
logger.info("没有配置secret:{},{}", taskId, wxEnterpriseId);
return;
private void update(String taskId, String dataId, String enterpriseId, String reason) {
}
TabHaobanPreDealLog preData = preDealService.getByDataId(taskId, dataId, PreDealTypeEnum.dept.getVal(), -1);
String pDataId = preData.getpDataId();
if ("0".equals(pDataId)) {
//根级部门
private String getStaff(String taskId, String dataId, String enterpriseId, String wxEnterpriseId,SecretSettingDTO secretSetting,WxEnterpriseQwDTO qwDTO) {
if("1".equals(dataId)) {
List<UserDTO> list = null ;
WxEnterpriseQwDTO qwDTO = this.wxEnterpriseService.getQwInfo(wxEnterpriseId) ;
if(qwDTO.getWxSecurityType()==4) {
list = this.qywxUserApiService.listDepartmentUser(qwDTO.getThirdCorpid(), config.getWxSuiteid(), dataId, 1) ;
logger.info("从(第三方)获取通讯录={}",JSON.toJSONString(list)) ;
......@@ -180,14 +186,11 @@ public class QywxDeptSyncOperation implements BaseSyncOperation {
}
if (null == list) {
logger.info("微信获取部门成员异常:{},{}", taskId, dataId);
return;
dealSyncOperationApiService.unlockTask(wxEnterpriseId) ;
return "获取企微成员异常";
}
logger.info("获取部门列表");
//预处理门店 写入预处理表
if (CollectionUtils.isNotEmpty(list)) {
List<String> dataList = preDealService.listDataIdByTaskIdAndDataType(taskId, PreDealTypeEnum.clerk.getVal());
List<TabHaobanPreDealLog> preDealLogList = list.stream().filter(mid -> !dataList.contains(mid.getUserid())
).map(userDTO -> {
logger.info("拉取成员数={},taskId={}",list.size(),taskId);
List<TabHaobanPreDealLog> preDealLogList = list.stream().map(userDTO -> {
TabHaobanPreDealLog dealLog = new TabHaobanPreDealLog();
dealLog.setDataId(userDTO.getUserid());
dealLog.setpDataId(dataId);
......@@ -199,72 +202,13 @@ public class QywxDeptSyncOperation implements BaseSyncOperation {
dealLog.setEnterpriseId(enterpriseId);
return dealLog;
}).collect(Collectors.toList());
//插入不存在的我们pre表的企业微信数据
preDealService.insert(preDealLogList);
}
}
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.dept.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
if (!b) {
logger.info("成功修改错误!{}", dataId);
return;
}
List<TabHaobanPreDealLog> preList = preDealService.listReByPDataId(taskId, dataId);
if (CollectionUtils.isNotEmpty(preList)) {
Set<String> mid = preList.stream().map(tab -> tab.getDataId()).collect(Collectors.toSet());
dealDepartmentToMq(taskId, mid, SyncTaskStatusEnum.group_sync);
} else {
checkDepartmentTask(taskId);
this.preDealService.insert(preDealLogList);
this.syncTaskService.updateTaskStatus(taskId,4) ;
}
return null ;
}
@Override
public void checkDepartmentTask(String taskId) {
boolean b = preDealService.checkTask(taskId, PreDealTypeEnum.dept.getVal());
if (!b) {
return;
}
logger.info("微信部门继续成功,开始门店同步:{}", taskId);
String key = "haoban_sync_department_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
//同步成功 进入门店处理
if (syncTask.getStatusFlag().equals(SyncTaskStatusEnum.group_sync.getVal())) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.clerk_sync.getVal());
List<TabHaobanPreDealLog> dealLogs = preDealService.listByTaskId(taskId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.pre.getVal());
if (CollectionUtils.isEmpty(dealLogs)) {
qywxClerkSyncOperation.checkDepartmentTask(taskId);
} else {
logger.info("处理微信成员开始:{}", taskId);
Set<String> dataIds = dealLogs.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, dataIds, SyncTaskStatusEnum.clerk_sync);
}
}
RedisUtil.unlock(key);
}
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
dealList.forEach(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendMessage("departmentSyncDealMq2", JSONObject.toJSONString(dealParamMqDTO));
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId, relationId);
e.printStackTrace();
}
});
}
}
......@@ -59,11 +59,14 @@ public class QywxStaffSyncOperation implements BaseSyncOperation {
String reason = "";
//处理clerk
try {
ServiceResponse response = staffApiService.getWxSaveNew(dealParamMqDTO.getData(), dataPre.getWxEnterpriseId());
String taskId = dataPre.getTaskId() ;
String wxUserId = dealParamMqDTO.getData() ;
ServiceResponse response = staffApiService.getWxSaveNew(wxUserId, dataPre.getWxEnterpriseId());
if (response.getCode() != 1) {
dealFlag = false;
reason = response.getMessage();
}
this.preDealService.addTaskDepartStatusCache(taskId,2);
} catch (Exception e) {
logger.info("企微通讯录成员同步失败:{}", e.getMessage(), e);
reason = "同步异常:";
......@@ -80,51 +83,17 @@ public class QywxStaffSyncOperation implements BaseSyncOperation {
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
logger.info("企微通讯录成员处理失败:t:{},d:{}", taskId, dataId);
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.exception.getVal(), reason);
if (!b) {
logger.info("企微通讯录成员处理异常:t:{}, d:{}", taskId, dataId);
return;
}
checkDepartmentTask(taskId);
this.preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.exception.getVal(), reason);
}
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
logger.info("企微通讯录成员处理成功:t:{},d:{}", taskId, dataId);
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
if (!b) {
logger.info("企微通讯录成员成功修改错误!{}", dataId);
return;
}
checkDepartmentTask(taskId);
this.preDealService.updateStatusByDataId(taskId, dataId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
}
@Override
public void checkDepartmentTask(String taskId) {
boolean b = preDealService.checkTask(taskId, PreDealTypeEnum.clerk.getVal());
if (!b) {
return;
}
logger.info("企微通讯录成员成功:{}", taskId);
String key = "haoban_sync_clerk_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
String wxEnterpriseId = syncTask.getWxEnterpriseId();
//同步成功 进入门店处理
if (syncTask.getStatusFlag().equals(SyncTaskStatusEnum.clerk_sync.getVal())) {
int errCount = preDealService.countByTaskId(taskId, -1, PreDealStatusEnum.exception.getVal());
if (errCount > 0) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.exception_compute.getVal());
dealSyncOperationApiService.unlockTask(wxEnterpriseId);
} else {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
dealSyncOperationApiService.unlockTask(wxEnterpriseId);
dealSyncOperationApiService.cleanDiffrence(wxEnterpriseId, taskId);
logger.info("同步通讯录完成");
}
}
RedisUtil.unlock(key);
}
}
......@@ -195,14 +195,6 @@
</if>
</select>
<select id="listReByPDataId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from tab_haoban_pre_deal_log
where task_id = #{taskId} and p_data_id=#{dataId} and data_type=#{dataType}
and status_flag in(0,1,3)
</select>
<update id="updateStatusByDataId">
update tab_haoban_pre_deal_log
<set>
......
......@@ -639,7 +639,7 @@ public class StaffController extends WebBaseController {
if (enterpriseDTO != null) {
String taskId = dealSyncOperationApiService.createQywxTask(wxEnterpriseId, "后门同步企业微信架构", login.getClerkId(), "后门同步企业微信架构");
if (StringUtils.isBlank(taskId)) {
return resultResponse(HaoBanErrCode.ERR_0, false);
return this.fail("通讯录正在同步中") ; //resultResponse(HaoBanErrCode.ERR_0, false);
}
String ret = dealSyncOperationApiService.dealQywxDepartment(taskId, wxEnterpriseId);
if (null != ret) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment