Commit 729f0cf6 by songyinghui

feat: 会员Id非授权方式创建时监听binglog

parent ba494dcd
package com.gic.haoban.manage.service.pojo.bo.content.binlog;
import com.alibaba.fastjson.annotation.JSONField;
import com.gic.haoban.manage.service.pojo.BinlogBasePojo;
/**
* @Author MUSI
* @Date 2023/5/10 9:43 AM
* @Description
* @Version
**/
public class MaterialMemberUnionIdPoJo extends BinlogBasePojo {
/**
* 会员unionId
*
*/
@JSONField(name = "third_unionid")
private String unionId;
/**
* memberId
*/
@JSONField(name = "member_id")
private String memberId;
public String getUnionId() {
return unionId;
}
public void setUnionId(String unionId) {
this.unionId = unionId;
}
public String getMemberId() {
return memberId;
}
public void setMemberId(String memberId) {
this.memberId = memberId;
}
}
...@@ -105,7 +105,7 @@ public class InteractRecordApiServiceImpl implements InteractRecordApiService { ...@@ -105,7 +105,7 @@ public class InteractRecordApiServiceImpl implements InteractRecordApiService {
// 会员授权事件 // 会员授权事件
// 根据unionId 刷新 memberId 并写入销售线索 // 根据unionId 刷新 memberId 并写入销售线索
interactRecordService.memberAuthorized(memberAttrChangeBO.getMemberId(), memberAttrChangeBO.getUnionId()); // interactRecordService.memberAuthorized(memberAttrChangeBO.getMemberId(), memberAttrChangeBO.getUnionId());
} }
if (MemberAttrChangeBO.MemberChangeOpt.MEMBER_MERGE.getCode().equals(memberAttrChangeBO.getOpt())) { if (MemberAttrChangeBO.MemberChangeOpt.MEMBER_MERGE.getCode().equals(memberAttrChangeBO.getOpt())) {
......
...@@ -16,7 +16,9 @@ import com.gic.haoban.manage.service.pojo.GroupSyncPojo; ...@@ -16,7 +16,9 @@ import com.gic.haoban.manage.service.pojo.GroupSyncPojo;
import com.gic.haoban.manage.service.pojo.bo.content.binlog.MaterialMemberEOrderPojo; import com.gic.haoban.manage.service.pojo.bo.content.binlog.MaterialMemberEOrderPojo;
import com.gic.haoban.manage.service.pojo.StoreSyncPojo; import com.gic.haoban.manage.service.pojo.StoreSyncPojo;
import com.gic.haoban.manage.service.pojo.bo.content.binlog.MaterialMemberGicOrderPojo; import com.gic.haoban.manage.service.pojo.bo.content.binlog.MaterialMemberGicOrderPojo;
import com.gic.haoban.manage.service.pojo.bo.content.binlog.MaterialMemberUnionIdPoJo;
import com.gic.haoban.manage.service.service.StoreRangeService; import com.gic.haoban.manage.service.service.StoreRangeService;
import com.gic.haoban.manage.service.service.content.InteractRecordService;
import com.gic.haoban.manage.service.service.content.PotentialCustomerService; import com.gic.haoban.manage.service.service.content.PotentialCustomerService;
import com.gic.haoban.manage.service.service.hm.HmQrcodeService; import com.gic.haoban.manage.service.service.hm.HmQrcodeService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -49,6 +51,8 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -49,6 +51,8 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
private final String tab_gic_eorder = "tab_gic_eorder"; private final String tab_gic_eorder = "tab_gic_eorder";
private final String tab_gic_order = "tab_gic_order"; private final String tab_gic_order = "tab_gic_order";
private final String tab_gic_member = "tab_gic_member";
@Autowired @Autowired
private StoreRangeService storeRangeService; private StoreRangeService storeRangeService;
...@@ -59,6 +63,8 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -59,6 +63,8 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
private HmQrcodeService hmQrcodeService; private HmQrcodeService hmQrcodeService;
@Autowired @Autowired
PotentialCustomerService potentialCustomerService; PotentialCustomerService potentialCustomerService;
@Autowired
private InteractRecordService interactRecordService;
@Override @Override
public void onMessage(ConsumerRecord<String, GicRecord> record) { public void onMessage(ConsumerRecord<String, GicRecord> record) {
...@@ -95,6 +101,9 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -95,6 +101,9 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
else if (StringUtils.startsWith(record.value().getTableName(), tab_gic_order)) { else if (StringUtils.startsWith(record.value().getTableName(), tab_gic_order)) {
// 处理会员订单新增事件 // 处理会员订单新增事件
dealMemberGicOrder((MaterialMemberGicOrderPojo) pojo); dealMemberGicOrder((MaterialMemberGicOrderPojo) pojo);
}else if (StringUtils.startsWith(record.value().getTableName(), tab_gic_member)) {
// 会员unionId新增事件
dealMemberUnionIdChange((MaterialMemberUnionIdPoJo) pojo);
} }
} }
...@@ -119,6 +128,36 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -119,6 +128,36 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
mid.put(gicField.getName(), gicField.getValue()); mid.put(gicField.getName(), gicField.getValue());
} }
return JSON.parseObject(JSON.toJSONString(mid), MaterialMemberGicOrderPojo.class); return JSON.parseObject(JSON.toJSONString(mid), MaterialMemberGicOrderPojo.class);
} else if (StringUtils.startsWith(record.getTableName(), tab_gic_member)) {
GicRecordType recordType = record.getRecordType();
if (GicRecordType.INSERT.equals(recordType)) {
Map<String, String> mid = new HashMap<>();
for (GicField gicField : record.getFieldList()) {
mid.put(gicField.getName(), gicField.getValue());
}
return JSON.parseObject(JSON.toJSONString(mid), MaterialMemberUnionIdPoJo.class);
}
if (GicRecordType.UPDATE.equals(recordType)) {
Map<String, String> old = new HashMap<>();
Map<String, String> newMap = new HashMap<>();
for (int i = 0; i < record.getFieldList().size(); i++) {
GicField gicField = record.getFieldList().get(i);
if (i % 2 == 0){
old.put(gicField.getName(), gicField.getValue());
}else {
newMap.put(gicField.getName(), gicField.getValue());
}
}
MaterialMemberUnionIdPoJo oldMember = JSON.parseObject(JSON.toJSONString(old), MaterialMemberUnionIdPoJo.class);
MaterialMemberUnionIdPoJo newMemberInfo = JSON.parseObject(JSON.toJSONString(newMap), MaterialMemberUnionIdPoJo.class);
if (oldMember == null || StringUtils.isNotBlank(oldMember.getUnionId())) {
logger.info("会员原本存在unionId, old:{}, new:{}", JSON.toJSONString(oldMember), JSON.toJSONString(newMemberInfo));
return null;
}
return newMemberInfo;
}
return null;
} }
return null; return null;
} }
...@@ -235,5 +274,13 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -235,5 +274,13 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
potentialCustomerService.updateMemberDealFlag(memberOrderPojo.getEnterpriseId(), memberOrderPojo.getMemberId()); potentialCustomerService.updateMemberDealFlag(memberOrderPojo.getEnterpriseId(), memberOrderPojo.getMemberId());
} }
/**
*
* @param materialMemberUnionIdPoJo
*/
private void dealMemberUnionIdChange(MaterialMemberUnionIdPoJo materialMemberUnionIdPoJo) {
logger.info("处理会员unionId新增事件 >> {}", JSON.toJSONString(materialMemberUnionIdPoJo));
interactRecordService.memberAuthorized(materialMemberUnionIdPoJo.getMemberId(), materialMemberUnionIdPoJo.getUnionId());
}
} }
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
<value>haoban_gic_store_group</value> <value>haoban_gic_store_group</value>
<value>tab_haoban_member_order_event_prod</value> <value>tab_haoban_member_order_event_prod</value>
<value>haoban_gic_order</value> <value>haoban_gic_order</value>
<value>content_member_unionId_notice</value>
</list> </list>
</constructor-arg> </constructor-arg>
<property name="messageListener" ref="kafkaConsumerService"/> <property name="messageListener" ref="kafkaConsumerService"/>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment