Commit b8166dca authored by 赵啸非's avatar 赵啸非

修改消息组件

parent b986a7e6
Pipeline #1315 canceled with stages
...@@ -19,9 +19,10 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -19,9 +19,10 @@ import java.util.concurrent.LinkedBlockingQueue;
public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueueMsg> implements TbCoreConsumerService { public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueueMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}") @Value("${queue.core.poll-interval}")
private long pollDuration; private long pollDuration;//队列拉取时间间隔,单位毫秒
@Value("${queue.core.pack-processing-timeout}") @Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout; private long packProcessingTimeout;
@Getter @Getter
private LinkedBlockingQueue<TbQueueMsg> comsureQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<TbQueueMsg> comsureQueue = new LinkedBlockingQueue<>();
...@@ -30,8 +31,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu ...@@ -30,8 +31,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
@Getter @Getter
private List<TbQueueConsumer<TbQueueMsg>> consumerList; private List<TbQueueConsumer<TbQueueMsg>> consumerList;
/**
* 根据配置文件动态加载kafka,rabbmitMq等工厂类
* @param tbCoreQueueFactory
*/
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) { public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) {
//通过工厂类创建通道
this.mainConsumer = tbCoreQueueFactory.createMsgConsumer(); this.mainConsumer = tbCoreQueueFactory.createMsgConsumer();
//tbCoreQueueFactory.createListMsgConsumer() //tbCoreQueueFactory.createListMsgConsumer()
...@@ -41,7 +46,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu ...@@ -41,7 +46,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
public void init() { public void init() {
log.info("初始化消费服务线程"); log.info("初始化消费服务线程");
super.init("core-consumer"); super.init("core-consumer");
// super.init("tb-core-consumer", "tb-core-notifications-consumer");
} }
@PreDestroy @PreDestroy
......
...@@ -44,6 +44,10 @@ public interface TbQueueConsumer<T extends TbQueueMsg> { ...@@ -44,6 +44,10 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
*/ */
void commit(); void commit();
/**
* 通道
* @return
*/
String getChannelNumber(); String getChannelNumber();
} }
...@@ -10,7 +10,12 @@ import org.springframework.core.annotation.Order; ...@@ -10,7 +10,12 @@ import org.springframework.core.annotation.Order;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
/**
* 消费服务抽象类
*
* @author: zxfei
* @date: 2022/4/28 9:43
*/
@Slf4j @Slf4j
public abstract class AbstractConsumerService<N extends TbQueueMsg> { public abstract class AbstractConsumerService<N extends TbQueueMsg> {
...@@ -27,12 +32,15 @@ public abstract class AbstractConsumerService<N extends TbQueueMsg> { ...@@ -27,12 +32,15 @@ public abstract class AbstractConsumerService<N extends TbQueueMsg> {
*/ */
protected abstract void launchConsumersList(); protected abstract void launchConsumersList();
/**
* 消息线程池
*/
protected volatile ExecutorService consumersExecutor; protected volatile ExecutorService consumersExecutor;
/**
* 变量
*/
protected volatile boolean stopped = false; protected volatile boolean stopped = false;
public void init(String mainConsumerThreadName) { public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName)); this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
launchMainConsumers(); launchMainConsumers();
......
...@@ -21,8 +21,6 @@ public interface TbCoreQueueFactory { ...@@ -21,8 +21,6 @@ public interface TbCoreQueueFactory {
*/ */
TbQueueConsumer<TbQueueMsg> createMsgConsumer(); TbQueueConsumer<TbQueueMsg> createMsgConsumer();
/* *//** /* *//**
* 消息消费服务 * 消息消费服务
* @return * @return
......
...@@ -9,10 +9,27 @@ import lombok.Data; ...@@ -9,10 +9,27 @@ import lombok.Data;
*/ */
@Data @Data
public class UserEntityExt extends BaseEntityLong { public class UserEntityExt extends BaseEntityLong {
/**
* 站点名称
*/
private String siteName; private String siteName;
/**
* 唯一标识
*/
private String token;
/**
* 菜单栏
*/
private String menuUrl;
private String roleIds; /**
* 登录时间
*/
private Long loginTime;
private String roleNames; /**
* 过期时间
*/
private Long expireTime;
} }
\ No newline at end of file
...@@ -25,8 +25,6 @@ public class DeviceReq implements Serializable { ...@@ -25,8 +25,6 @@ public class DeviceReq implements Serializable {
private String sitenum; private String sitenum;
private String port; private String port;
private String centernum; private String centernum;
......
package com.mortals.xhx.common.utils;
import com.mortals.framework.util.StringUtils;
import lombok.Data;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
@Data
public abstract class GateProtConfig {
/**
* 协议ID
*/
private Integer protocol;
/**
* 流量数
*/
private Integer mtFlux;
/**
* 状态
*/
private Integer state;
/**
* 配置参数
*/
private String paramConfig;
private Integer SendDelayTime = 1 * 1000;
private Integer maxDelayNum = 50;
public GateProtConfig(String xml) throws Exception {
if (StringUtils.isEmpty(xml)) {
return;
}
Document doc = null;
doc = DocumentHelper.parseText(xml);
Element root = doc.getRootElement();
analyze(root);
}
/**
* 解析xml处理
*/
public abstract void analyze(Element root) throws Exception;
}
package com.mortals.xhx.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
/**
* 发送任务
*
* @author: zxfei
* @date: 2022/4/28 10:56
* @description:
**/
@Slf4j
public class SendTask implements Runnable {
@Override
public void run() {
// TODO: 2022/4/28
try {
// log.debug("启动发送"+smsGateQueueEntity);
} catch (Exception e) {
log.error("发送异常:" + e);
}
}
}
package com.mortals.xhx.common.utils;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 发送任务线程池
*
* @author: zxfei
* @date: 2022/4/28 10:52
*/
@Component
public class SendTaskThreadPool {
/**
* 线程池
*/
private ThreadPoolExecutor threadPool;
private int poolSize;
private volatile boolean isInit = false;
private Object lock = new Object();
public void init(Integer threadNum) {
if (poolSize < 0) {
throw new IllegalArgumentException();
}
if (poolSize < Runtime.getRuntime().availableProcessors()) {
poolSize = Runtime.getRuntime().availableProcessors() + 1;
}
if (!isInit) {
synchronized (lock) {
if (!isInit) {
threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
isInit = true;
}
}
}
}
public void execute(Runnable command) {
threadPool.execute(command);
}
public void setPoolSize(int poolSize) {
threadPool.setCorePoolSize(poolSize);
threadPool.setMaximumPoolSize(poolSize);
}
public void incrementPoolSize(int delta) {
setPoolSize(threadPool.getCorePoolSize() + delta);
}
public synchronized void close() {
if (threadPool != null) {
threadPool.shutdown();
threadPool = null;
isInit = false;
}
}
}
package com.mortals.xhx.daemon.applicationservice; package com.mortals.xhx.daemon.applicationservice;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.mortals.framework.springcloud.service.IApplicationService; import com.mortals.framework.springcloud.service.IApplicationService;
@Component @Component
@Slf4j
public class DemoStartService implements IApplicationService { public class DemoStartService implements IApplicationService {
private static Log logger = LogFactory.getLog(DemoStartService.class); @Autowired
private SendTaskThreadPool sendTaskThreadPool;
@Override @Override
public void start() { public void start() {
logger.info("开始服务..[配置已加载完成,但部分框架还未初始化,比如:Kafka]"); log.info("初始化发送线程数量");
sendTaskThreadPool.init(20);
log.info("开始服务..[配置已加载完成,但部分框架还未初始化,比如:Kafka]");
} }
@Override @Override
public void stop() { public void stop() {
logger.info("停止服务.."); log.info("停止服务..");
} }
} }
...@@ -6,15 +6,18 @@ import com.mortals.framework.springcloud.service.IApplicationStartedService; ...@@ -6,15 +6,18 @@ import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.framework.ws.message.SendToAllRequest; import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
import com.mortals.xhx.base.framework.ws.util.WebSocketUtil; import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
import com.mortals.xhx.busiz.req.DeviceReq; import com.mortals.xhx.busiz.req.DeviceReq;
import com.mortals.xhx.common.code.ActiveEnum; import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
import com.mortals.xhx.common.code.StatusEnum;
import com.mortals.xhx.common.key.Constant; import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.utils.SendTask;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import com.mortals.xhx.module.device.model.DeviceEntity; import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity; import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.model.DeviceQuery; import com.mortals.xhx.module.device.model.DeviceQuery;
import com.mortals.xhx.module.device.service.DeviceLogService; import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService; import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.queue.DefaultTbCoreConsumerService; import com.mortals.xhx.queue.DefaultTbCoreConsumerService;
import com.mortals.xhx.queue.TbQueueConsumer; import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg; import com.mortals.xhx.queue.TbQueueMsg;
...@@ -45,6 +48,12 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -45,6 +48,12 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
private DeviceLogService deviceLogService; private DeviceLogService deviceLogService;
@Autowired @Autowired
private DeviceService deviceService; private DeviceService deviceService;
@Autowired
private ProductService productService;
@Autowired
private PlatformService platformService;
@Autowired
private SendTaskThreadPool sendTaskThreadPool;
protected volatile ExecutorService consumersExecutor; protected volatile ExecutorService consumersExecutor;
...@@ -53,22 +62,22 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -53,22 +62,22 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
@Override @Override
public void start() { public void start() {
log.info("开始服务..[配置已加载完成,并且所有框架都已经初始化]"); log.info("服务端消费消息服务开始..");
TbQueueConsumer<TbQueueMsg> mainConsumer = consumerService.getMainConsumer(); TbQueueConsumer<TbQueueMsg> mainConsumer = consumerService.getMainConsumer();
if (!ObjectUtils.isEmpty(mainConsumer)) { if (!ObjectUtils.isEmpty(mainConsumer)) {
//订阅所有已几快活设备 //订阅所有已几快活设备
Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream() Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream()
.map(item -> .map(item ->
new TopicPartitionInfo(Constant.UPLOAD_TOPIC + item.getDeviceMac(), null,null) new TopicPartitionInfo(Constant.UPLOAD_TOPIC + item.getDeviceMac(), null, null)
).collect(Collectors.toSet()); ).collect(Collectors.toSet());
mainConsumer.subscribe(topicPartitionInfoSet); mainConsumer.subscribe(topicPartitionInfoSet);
log.info("消费线程订阅设备上报消息成功!:" + JSON.toJSONString(topicPartitionInfoSet)); log.debug("消费线程订阅设备上报消息成功!:" + JSON.toJSONString(topicPartitionInfoSet));
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName("消费queue线程")); this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName("消费queue线程"));
consumersExecutor.submit(() -> { consumersExecutor.submit(() -> {
while (!stopped) { while (!stopped) {
try { try {
//todo 批量读取
TbQueueMsg queueMsg = consumerService.getComsureQueue().poll(); TbQueueMsg queueMsg = consumerService.getComsureQueue().poll();
if (!ObjectUtils.isEmpty(queueMsg)) { if (!ObjectUtils.isEmpty(queueMsg)) {
//做相应业务,做日志操作 //做相应业务,做日志操作
...@@ -84,7 +93,6 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -84,7 +93,6 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue()); deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue());
deviceEntity.setIp(deviceReq.getIp()); deviceEntity.setIp(deviceReq.getIp());
deviceEntity.setPort(deviceReq.getPort()); deviceEntity.setPort(deviceReq.getPort());
// deviceEntity.setSiteNum(deviceReq.getSitenum());
deviceEntity.setCenternum(deviceReq.getCenternum()); deviceEntity.setCenternum(deviceReq.getCenternum());
deviceService.update(deviceEntity); deviceService.update(deviceEntity);
...@@ -97,6 +105,34 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -97,6 +105,34 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
deviceLogEntity.setCreateTime(new Date()); deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity); deviceLogService.save(deviceLogEntity);
//查询消息是否归属了第三方平台 如果有则发送消息
//获取exchange,
PlatformEntity platformEntity = platformService.get(deviceEntity.getProductId());
if(!ObjectUtils.isEmpty(platformEntity)&&platformEntity.getSendSwitch()== YesNoEnum.YES.getValue()){
if(platformEntity.getSendMsgType()== SendMsgTypeEnum.http.getValue()){
//http方式
//通过线程池进行发送消息
SendTask sendTask = new SendTask();
sendTaskThreadPool.execute(sendTask);
}
}
//deviceEntity.getPlatformId()
//queueMsg.getHeaders()
if (bool) { if (bool) {
WebSocketUtil.broadcast(SendToAllRequest.TYPE, new SendToAllRequest().setContent(JSON.toJSONString(deviceEntity))); WebSocketUtil.broadcast(SendToAllRequest.TYPE, new SendToAllRequest().setContent(JSON.toJSONString(deviceEntity)));
} }
......
...@@ -3,7 +3,8 @@ module.log=com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6Outag ...@@ -3,7 +3,8 @@ module.log=com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6Outag
logMessageFormat=com.mortals.xhx.base.framework.config.P6spySqlFormatConfig logMessageFormat=com.mortals.xhx.base.framework.config.P6spySqlFormatConfig
#customLogMessageFormat=%(currentTime) | SQL耗时: %(executionTime) ms | 连接信息: %(category)-%(connectionId) | 执行语句: %(sql) #customLogMessageFormat=%(currentTime) | SQL耗时: %(executionTime) ms | 连接信息: %(category)-%(connectionId) | 执行语句: %(sql)
# 使用控制台记录sql # 使用控制台记录sql
appender=com.p6spy.engine.spy.appender.Slf4JLogger #appender=com.p6spy.engine.spy.appender.Slf4JLogger
appender=com.p6spy.engine.spy.appender.StdoutLogger
#appender=com.p6spy.engine.spy.appender.FileLogger #appender=com.p6spy.engine.spy.appender.FileLogger
## 配置记录Log例外 ## 配置记录Log例外
#excludecategories=info,debug,result,batc,resultset #excludecategories=info,debug,result,batc,resultset
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment