Commit ae6b9b6c by fudahua

Merge branch 'developer-hua-5-21' into developer

# Conflicts:
#	haoban-manage3-service/src/main/resources/mapper/WxEnterpriseRelatedMapper.xml
#	haoban-manage3-service/src/test/java/ServiceTest.java
parents 651080b2 df6107b5
...@@ -127,6 +127,17 @@ ...@@ -127,6 +127,17 @@
<artifactId>haoban-app-customer-api</artifactId> <artifactId>haoban-app-customer-api</artifactId>
<version>${haoban-app-customer-api}</version> <version>${haoban-app-customer-api}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>com.gic</groupId>
<artifactId>gic-binlog-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.gic.binlog.base.entity.GicRecord;
import com.gic.clerk.api.dto.ClerkListDTO;
import com.gic.clerk.api.service.ClerkService;
import com.gic.haoban.common.utils.StringUtil;
import com.gic.haoban.communicate.api.dto.SyncJsonColumnDTO;
import com.gic.haoban.communicate.api.dto.SyncJsonDTO;
import com.gic.haoban.contacts.api.dto.EmployeeClerkAddDTO;
import com.gic.haoban.contacts.api.service.EmployeeClerkService;
import com.gic.haoban.contacts.manage.api.dto.DepartmentStoreGroupDTO;
import com.gic.haoban.contacts.manage.api.service.DepartmentStoreGroupService;
import com.gic.haoban.contacts.manage.api.service.StoreService;
import com.gic.haoban.manage.api.service.StaffApiService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListener;
import java.util.ArrayList;
import java.util.List;
/**
* Created 2019/4/10.
*
* @author hua
*/
public class KafkaMessageServiceImpl implements MessageListener<String, GicRecord> {
private final static Logger logger = LoggerFactory.getLogger(KafkaMessageServiceImpl.class);
private final String STORE = "tab_gic_store";
private final String CLERK = "tab_gic_clerk";
private final String DEPARTMENT = "tab_gic_store_group";
private final String STORE_IMAGE = "tab_gic_store_photo";
@Autowired
private StoreService storeService;
@Autowired
private ClerkService clerkService;
@Autowired
private StaffApiService staffApiService;
@Override
public void onMessage(ConsumerRecord<String, GicRecord> record) {
logger.info("message:{}", JSONObject.toJSONString(record.value()));
String message = JSONObject.toJSONString(record.value());
if(StringUtils.isBlank(message)){
return;
}
// SyncJsonDTO syncJsonDTO = JSON.toJavaObject((JSONObject)JSON.parse(message),SyncJsonDTO.class);
// String fieldListString = syncJsonDTO.getFieldList();
// String tableName = syncJsonDTO.getTableName();
if(tableName.equals(STORE)){
logger.info("【同步门店】");
String haobanClerkListString = setClerkList(fieldListString);
logger.info("【同步门店】haobanClerkListString={}",haobanClerkListString);
storeService.syncGicStore(fieldListString,haobanClerkListString);
}else if (tableName.equals(CLERK)){
logger.info("【同步店员】");
try {
employeeClerkService.syncGicClerk(fieldListString);
} catch (Exception e) {
logger.info("【同步店员】同步失败={}",e.toString());
}
staffApiService.syncGicClerk(fieldListString);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:apollo="http://www.ctrip.com/schema/apollo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.ctrip.com/schema/apollo
http://www.ctrip.com/schema/apollo.xsd">
<apollo:config/>
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.service}"/>
<entry key="group.id" value="haoban-3-sync"/>
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer"
value="com.gic.binlog.base.serializer.KafkaGicRecordSerializer"/>
</map>
</constructor-arg>
</bean>
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 3.定义消费实现类 -->
<bean id="kafkaConsumerService" class="com.gic.haoban.manage.service.task.KafkaMessageServiceImpl"/>
<!-- 4.消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!-- topic -->
<constructor-arg name="topics">
<list>
<value>haoban_gic_clerk</value>
<value>haoban_gic_store</value>
</list>
</constructor-arg>
<property name="messageListener" ref="kafkaConsumerService"/>
</bean>
<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="3"/>
</bean>
</beans>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment