Commit 0245cc52 by 陶光胜

init

parent bb5b34f3
......@@ -139,6 +139,11 @@
<artifactId>gic-binlog-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.gic</groupId>
<artifactId>gic-platform-auth-api</artifactId>
<version>${gic-platform-auth-api}</version>
</dependency>
</dependencies>
<build>
......
package com.gic.message.constant;
public class Constants {
public static final String INSERT = "INSERT";
public static final String UPDATE = "UPDATE";
}
package com.gic.message.service;
import com.gic.binlog.base.entity.GicRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface MessageHandler {
void handler(ConsumerRecord<String, GicRecord> consumerRecord);
}
......@@ -14,4 +14,8 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
public void onMessage(ConsumerRecord<String, GicRecord> consumerRecord) {
log.info("接收kafka消息,内容为111:"+consumerRecord.value());
}
public static void main(String[] args){
System.out.println("110101".substring(0, "110101".length()-2));
}
}
package com.gic.message.service.impl;
import com.gic.redis.data.util.RedisUtil;
import com.gic.store.service.StoreApiService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class StoreIndexRefreshHandler {
@Autowired
private StoreApiService storeApiService;
public void refreshStoreIndex(Integer enterpriseId, Integer storeId){
String key = "enterprise:refresh:"+storeId;
Object cache = RedisUtil.getCache(key);
if(cache == null){
this.storeApiService.addStoreToIndex(enterpriseId, storeId);
RedisUtil.setCache(key, storeId, 1l, TimeUnit.SECONDS);
}
}
}
package com.gic.message.service.impl;
import com.gic.api.base.commons.ServiceResponse;
import com.gic.binlog.base.entity.GicField;
import com.gic.binlog.base.entity.GicRecord;
import com.gic.binlog.base.entity.enums.GicRecordType;
import com.gic.message.service.MessageHandler;
import com.gic.message.utils.ListToMapUtil;
import com.gic.store.constant.StoreOwnTypeEnum;
import com.gic.store.dto.StoreDTO;
import com.gic.store.service.StoreApiService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Component
public class StoreInfoMessageHandler implements MessageHandler {
@Autowired
private StoreIndexRefreshHandler storeIndexRefreshHandler;
@Autowired
private StoreApiService storeApiService;
@Override
public void handler(ConsumerRecord<String, GicRecord> consumerRecord) {
GicRecord value = consumerRecord.value();
Map<String, GicField> fieldMap = ListToMapUtil.listToMap(consumerRecord);
Integer enterpriseId = 0, storeInfoId = 0, storeId = 0;
if(GicRecordType.INSERT.value() == value.getRecordType().value()){
storeInfoId = Integer.valueOf(fieldMap.get("store_info_id").getValue());
ServiceResponse<List<StoreDTO>> listStoreResponse = this.storeApiService.listStoreByStoreInfoId(storeInfoId);
List<StoreDTO> listStore = listStoreResponse.getResult();
for(StoreDTO storeDTO : listStore){
enterpriseId = storeDTO.getEnterpriseId();
if(storeDTO.getOwnType() == StoreOwnTypeEnum.OWNER.getCode()){
}else {
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeDTO.getStoreId());
}
}
}
if(GicRecordType.UPDATE.value() == value.getRecordType().value()){
int isAdd = 0;
if(Integer.valueOf(fieldMap.get("old_status").getValue()) == 0 && Integer.valueOf(fieldMap.get("new_status").getValue()) == 1){
isAdd = 1;
}
if(Integer.valueOf(fieldMap.get("old_status").getValue()) == 1 && Integer.valueOf(fieldMap.get("new_status").getValue()) == 0){
isAdd = 2;
}
enterpriseId = Integer.valueOf(fieldMap.get("new_enterprise_id").getValue());
storeId = Integer.valueOf(fieldMap.get("new_store_id").getValue());
if(isAdd != 0){
if(isAdd == 1){
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeId);
}else {
this.storeApiService.deleteStoreFromES(enterpriseId, storeId);
}
}else {
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeId);
}
}
if(GicRecordType.DELETE.value() == value.getRecordType().value()){
enterpriseId = Integer.valueOf(fieldMap.get("enterprise_id").getValue());
storeId = Integer.valueOf(fieldMap.get("store_id").getValue());
this.storeApiService.deleteStoreFromES(enterpriseId, storeId);
}
}
}
package com.gic.message.service.impl;
import com.gic.api.base.commons.ServiceResponse;
import com.gic.auth.constant.AuthModeEnum;
import com.gic.auth.service.UnionEnterpriseApiService;
import com.gic.binlog.base.entity.GicField;
import com.gic.binlog.base.entity.GicRecord;
import com.gic.binlog.base.entity.enums.GicRecordType;
import com.gic.message.service.MessageHandler;
import com.gic.message.utils.ListToMapUtil;
import com.gic.store.constant.StoreOwnTypeEnum;
import com.gic.store.dto.StoreSearchDTO;
import com.gic.store.dto.StoreWidgetDTO;
import com.gic.store.service.StoreApiService;
import com.gic.store.service.StoreWidgetApiService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Component
public class StoreMessageHandler implements MessageHandler {
@Autowired
private StoreApiService storeApiService;
@Autowired
private StoreIndexRefreshHandler storeIndexRefreshHandler;
@Autowired
private StoreWidgetApiService storeWidgetApiService;
@Autowired
private UnionEnterpriseApiService unionEnterpriseApiService;
@Override
public void handler(ConsumerRecord<String, GicRecord> consumerRecord){
GicRecord value = consumerRecord.value();
Map<String, GicField> fieldMap = ListToMapUtil.listToMap(consumerRecord);
Integer enterpriseId = 0, storeId = 0;
if(GicRecordType.INSERT.value() == value.getRecordType().value()){
enterpriseId = Integer.valueOf(fieldMap.get("enterprise_id").getValue());
storeId = Integer.valueOf(fieldMap.get("store_id").getValue());
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeId);
Integer ownType = Integer.valueOf(fieldMap.get("own_type").getValue());
if(ownType == StoreOwnTypeEnum.OWNER.getCode()){
ServiceResponse<List<Long>> resourceResponse = this.unionEnterpriseApiService.listStoreResourceByEnterpriseId(enterpriseId);
if(CollectionUtils.isNotEmpty(resourceResponse.getResult())){
for(Long resourceId : resourceResponse.getResult()){
ServiceResponse<StoreWidgetDTO> storeWidget = this.storeWidgetApiService.getStoreWidget(resourceId.intValue());
if(storeWidget.isSuccess()){
StoreWidgetDTO storeWidgetDTO = storeWidget.getResult();
if(storeWidgetDTO != null){
if(AuthModeEnum.YES.getCode() == storeWidgetDTO.getAuthMode()){
StoreSearchDTO storeSearchDTO = new StoreSearchDTO();
storeSearchDTO.setStoreInfoIds("");
}
}
}
}
}
}
}
if(GicRecordType.UPDATE.value() == value.getRecordType().value()){
int isAdd = 0;
if(Integer.valueOf(fieldMap.get("old_status").getValue()) == 0 && Integer.valueOf(fieldMap.get("new_status").getValue()) == 1){
isAdd = 1;
}
if(Integer.valueOf(fieldMap.get("old_status").getValue()) == 1 && Integer.valueOf(fieldMap.get("new_status").getValue()) == 0){
isAdd = 2;
}
enterpriseId = Integer.valueOf(fieldMap.get("new_enterprise_id").getValue());
storeId = Integer.valueOf(fieldMap.get("new_store_id").getValue());
if(isAdd != 0){
if(isAdd == 1){
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeId);
}else {
this.storeApiService.deleteStoreFromES(enterpriseId, storeId);
}
}else {
this.storeIndexRefreshHandler.refreshStoreIndex(enterpriseId, storeId);
}
}
if(GicRecordType.DELETE.value() == value.getRecordType().value()){
enterpriseId = Integer.valueOf(fieldMap.get("enterprise_id").getValue());
storeId = Integer.valueOf(fieldMap.get("store_id").getValue());
this.storeApiService.deleteStoreFromES(enterpriseId, storeId);
}
}
}
package com.gic.message.utils;
import com.gic.binlog.base.entity.GicField;
import com.gic.binlog.base.entity.GicRecord;
import com.gic.binlog.base.entity.enums.GicRecordType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ListToMapUtil {
public static Map<String, GicField> listToMap(ConsumerRecord<String, GicRecord> consumerRecord){
GicRecord value = consumerRecord.value();
List<GicField> list = value.getFieldList();
Map<String, GicField> result = new HashMap<>();
if(value.getRecordType().value() == GicRecordType.INSERT.value() || value.getRecordType().value() == GicRecordType.DELETE.value()){
for(int i=0; i< list.size(); i++){
GicField gicField = list.get(i);
result.put(gicField.getName(), gicField);
}
}
if(value.getRecordType().value() == GicRecordType.UPDATE.value()){
for(int i=0; i<list.size();){
GicField oldField = list.get(i);
GicField newFiled = list.get(i+1);
result.put("old_"+oldField.getName(), oldField);
result.put("new_"+newFiled.getName(), newFiled);
i = i+2;
}
}
return result;
}
}
......@@ -19,4 +19,7 @@
<dubbo:reference interface="com.gic.log.api.service.LogApiService" id="logApiService" timeout="6000" />
<dubbo:reference interface="com.gic.search.business.api.service.EsBusinessOperaApiService" id="esBusinessOperaApiService" timeout="6000" />
<dubbo:reference interface="com.gic.enterprise.service.PlatformBrandApiService" id="platformBrandApiService" timeout="6000" />
<dubbo:reference interface="com.gic.store.service.StoreApiService" id="storeApiService" timeout="6000" />
<dubbo:reference interface="com.gic.auth.service.UnionEnterpriseApiService" id="unionEnterpriseApiService" timeout="6000" />
<dubbo:reference interface="com.gic.store.service.StoreWidgetApiService" id="storeWidgetApiService" timeout="6000" />
</beans>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment