Commit 8997812e by songyinghui

feature:数据组队列切换

parent 38233505
package com.gic.haoban.manage.service.service.content.adaptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gic.haoban.manage.service.pojo.bo.content.message.InteractRecordMessageBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* @Author MUSI
* @Date 2023/9/4 3:14 PM
* @Description
* @Version
**/
@Slf4j
@Component
public class MessageConvertHelper {
/**
* 转换消息
* @param text
* @return
*/
public InteractRecordMessageBO convertMessageBo(String text) {
JSONObject jsonObject = JSON.parseObject(text);
if (MapUtils.isEmpty(jsonObject)) {
return null;
}
InteractRecordMessageBO interactRecordMessageBO = new InteractRecordMessageBO();
String enterpriseId = jsonObject.getString("merch_id");
interactRecordMessageBO.setEnterpriseId(enterpriseId);
String memberId = jsonObject.getString("member_id");
interactRecordMessageBO.setMemberId(memberId);
String unionId = jsonObject.getString("union_id");
interactRecordMessageBO.setUnionId(unionId);
String clerkId = jsonObject.getString("share_id");
interactRecordMessageBO.setClerkId(clerkId);
String storeId = jsonObject.getString("share_shop_id");
interactRecordMessageBO.setStoreId(storeId);
String materialId = jsonObject.getString("material_id");
interactRecordMessageBO.setMaterialId(materialId);
String shareMaterialChannel = jsonObject.getString("share_material_channel");
interactRecordMessageBO.setChannelSource(this.convertChannelSource(shareMaterialChannel));
String eventCode = jsonObject.getString("event_code");
interactRecordMessageBO.setEventType(this.convertEventType(eventCode));
Integer duration = jsonObject.getInteger("duration");
interactRecordMessageBO.setDurationTime(duration);
String orderNumber = jsonObject.getString("order_number");
interactRecordMessageBO.setOrderNumber(orderNumber);
String goodsId = jsonObject.getString("goods_id");
interactRecordMessageBO.setGoodsId(goodsId);
String runningUuid = jsonObject.getString("runningUuid");
interactRecordMessageBO.setBusinessUUId(runningUuid);
Long lastAccessTime = jsonObject.getLong("server_timestamp");
interactRecordMessageBO.setLastAccessTime(lastAccessTime);
String refUrl = jsonObject.getString("ref_url");
interactRecordMessageBO.setRefUrl(refUrl);
return interactRecordMessageBO;
}
private Integer convertChannelSource(String shareMaterialChannel) {
if (StringUtils.isBlank(shareMaterialChannel)) {
return null;
}
switch (shareMaterialChannel) {
case "null" :
case "NULL":
return null;
case "朋友圈":
return 1;
case "客户群":
return 2;
case "对话框":
return 3;
default:
return 0;
}
}
private Integer convertEventType(String eventCode) {
if (StringUtils.isBlank(eventCode)) {
return null;
}
switch (eventCode) {
case "material_page":
case "material_page_duration":
return 1;
case "store_goods_detail":
return 2;
case "store_buy_sucess":
return 3;
case "store_add_cart":
return 4;
default:
return null;
}
}
}
...@@ -3,6 +3,7 @@ package com.gic.haoban.manage.service.service.content.message; ...@@ -3,6 +3,7 @@ package com.gic.haoban.manage.service.service.content.message;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.gic.haoban.manage.api.enums.content.MaterialInteractRecordEventType; import com.gic.haoban.manage.api.enums.content.MaterialInteractRecordEventType;
import com.gic.haoban.manage.service.pojo.bo.content.message.InteractRecordMessageBO; import com.gic.haoban.manage.service.pojo.bo.content.message.InteractRecordMessageBO;
import com.gic.haoban.manage.service.service.content.adaptor.MessageConvertHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
...@@ -23,6 +24,8 @@ public class InteractRecordMessageListener implements MessageListener<String, St ...@@ -23,6 +24,8 @@ public class InteractRecordMessageListener implements MessageListener<String, St
@Autowired @Autowired
InteractRecordMessageService interactRecordMessageService; InteractRecordMessageService interactRecordMessageService;
@Autowired
MessageConvertHelper messageConvertHelper;
private static final List<Integer> dealEventType = new ArrayList<>(); private static final List<Integer> dealEventType = new ArrayList<>();
...@@ -36,7 +39,8 @@ public class InteractRecordMessageListener implements MessageListener<String, St ...@@ -36,7 +39,8 @@ public class InteractRecordMessageListener implements MessageListener<String, St
String value = consumerRecord.value(); String value = consumerRecord.value();
log.info("【InteractRecordMessageListener】处理埋点事件 {}", value); log.info("【InteractRecordMessageListener】处理埋点事件 {}", value);
InteractRecordMessageBO recordMessageBO = JSON.parseObject(value, InteractRecordMessageBO.class);
InteractRecordMessageBO recordMessageBO = messageConvertHelper.convertMessageBo(value);
if (recordMessageBO == null) { if (recordMessageBO == null) {
log.info("互动记录解析异常 {}", value); log.info("互动记录解析异常 {}", value);
return; return;
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<constructor-arg> <constructor-arg>
<map> <map>
<entry key="bootstrap.servers" value="${kafka.data.service}"/> <entry key="bootstrap.servers" value="${kafka.data.service}"/>
<entry key="group.id" value="haoban-3-data"/> <entry key="group.id" value="${kafka.data.group}"/>
<entry key="key.deserializer" <entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer"/> value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" <entry key="value.deserializer"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment