Commit 3e59c5c8 authored by 赵啸非's avatar 赵啸非

更新下发消息为线程池发送

parent aedb76ed
...@@ -82,7 +82,7 @@ public class RabbitConfig { ...@@ -82,7 +82,7 @@ public class RabbitConfig {
} }
});*/ });*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override @Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) { public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ObjectUtils.isEmpty(correlationData)){ if(!ObjectUtils.isEmpty(correlationData)){
...@@ -90,7 +90,7 @@ public class RabbitConfig { ...@@ -90,7 +90,7 @@ public class RabbitConfig {
log.info("发送消息id:{},ack:{}",correlationData.getId(),ack); log.info("发送消息id:{},ack:{}",correlationData.getId(),ack);
} }
} }
}); });*/
rabbitTemplate.setReturnCallback(messageCallbackService); rabbitTemplate.setReturnCallback(messageCallbackService);
......
package com.mortals.xhx.module.device.service.impl; package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.net.url.UrlPath; import cn.hutool.core.net.url.UrlPath;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
...@@ -297,7 +296,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -297,7 +296,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(getContextUserId(context)); deviceEntity.setUpdateUserId(getContextUserId(context));
this.getDeviceDao().update(deviceEntity); this.getDeviceDao().update(deviceEntity);
this.putCache(deviceEntity.getId().toString(),deviceEntity); this.putCache(deviceEntity.getId().toString(), deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId() == null ? "-1" : deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId() == null ? "-1" : deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId() == null ? "-1" : deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId() == null ? "-1" : deviceEntity.getProductId().toString());
if (enabled == YesNoEnum.YES.getValue()) { if (enabled == YesNoEnum.YES.getValue()) {
...@@ -306,12 +305,12 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -306,12 +305,12 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
this.sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceMethodEnum.STOP); this.sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceMethodEnum.STOP);
} }
if(enabled==EnabledEnum.停止.getValue()){ if (enabled == EnabledEnum.停止.getValue()) {
ErrorLogPdu errorLogPdu = new ErrorLogPdu(); ErrorLogPdu errorLogPdu = new ErrorLogPdu();
errorLogPdu.initAttrValue(); errorLogPdu.initAttrValue();
errorLogPdu.setTraceID(IdUtil.objectId()); errorLogPdu.setTraceID(IdUtil.objectId());
errorLogPdu.setAppName(productEntity.getProductCode()); errorLogPdu.setAppName(productEntity.getProductCode());
errorLogPdu.setMessage(deviceEntity.getDeviceName()+"设备停用!"); errorLogPdu.setMessage(deviceEntity.getDeviceName() + "设备停用!");
errorLogPdu.setPlatform("webos"); errorLogPdu.setPlatform("webos");
errorLogPdu.setCulprit(""); errorLogPdu.setCulprit("");
errorLogPdu.setTags(""); errorLogPdu.setTags("");
...@@ -320,7 +319,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -320,7 +319,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
errorLogPdu.setReleaseVersion(""); errorLogPdu.setReleaseVersion("");
errorLogPdu.setFingerprint(LogTypeEnum.上报事件.name()); errorLogPdu.setFingerprint(LogTypeEnum.上报事件.name());
errorLogPdu.setThreadNo(Thread.currentThread().toString()); errorLogPdu.setThreadNo(Thread.currentThread().toString());
errorLogPdu.setErrorStack(Thread.currentThread()+deviceEntity.getDeviceName()+"设备停用!"); errorLogPdu.setErrorStack(Thread.currentThread() + deviceEntity.getDeviceName() + "设备停用!");
errorLogPdu.setContext(""); errorLogPdu.setContext("");
errorLogPdu.setExtra(""); errorLogPdu.setExtra("");
errorLogPdu.setLogTime(new Date()); errorLogPdu.setLogTime(new Date());
...@@ -506,7 +505,6 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -506,7 +505,6 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
} }
@Override @Override
protected void removeBefore(Long[] ids, Context context) throws AppException { protected void removeBefore(Long[] ids, Context context) throws AppException {
Long[] removeIds = Arrays.asList(ids).stream().map(id -> { Long[] removeIds = Arrays.asList(ids).stream().map(id -> {
...@@ -693,55 +691,52 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -693,55 +691,52 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
@Override @Override
public void downMsg(List<DeviceMsgReq> list) { public void downMsg(List<DeviceMsgReq> list) {
try { try {
ArrayList<DeviceLogEntity> deviceLogList = new ArrayList<>(); // ArrayList<DeviceLogEntity> deviceLogList = new ArrayList<>();
List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList()); List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList());
List<List<DeviceMsgReq>> partition = ListUtil.partition(collect, 10);
for (List<DeviceMsgReq> deviceMsgReqs : partition) {
for (DeviceMsgReq item : deviceMsgReqs) {
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = this.getExtCache(item.getDeviceCode());
} catch (Exception e) {
log.info("deviceCode:{}", item.getDeviceCode());
log.error("redis 获取设备异常!", e);
continue;
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, item.getMessageType());
header.put(MessageHeader.DEVICECODE, item.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(deviceEntity.getDeviceCode());
log.info("send rabbitmq msg:{}", item.getDeviceCode());
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg),correlationData);
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(item.getMessageType());
deviceLogEntity.setContent(item.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogList.add(deviceLogEntity);
// deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", item.getDeviceCode());
}
}
//TimeUnit.SECONDS.sleep(1); for (DeviceMsgReq item : collect) {
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = this.getExtCache(item.getDeviceCode());
} catch (Exception e) {
log.info("deviceCode:{}", item.getDeviceCode());
log.error("redis 获取设备异常!", e);
continue;
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, item.getMessageType());
header.put(MessageHeader.DEVICECODE, item.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(deviceEntity.getDeviceCode());
log.info("send rabbitmq msg:{}", item.getDeviceCode());
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg), correlationData);
/* DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(item.getMessageType());
deviceLogEntity.setContent(item.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogList.add(deviceLogEntity);*/
// deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", item.getDeviceCode());
}
} }
/* if (!ObjectUtils.isEmpty(deviceLogList)) { /* if (!ObjectUtils.isEmpty(deviceLogList)) {
List<List<DeviceLogEntity>> partitionlogs = ListUtil.partition(deviceLogList, 100); List<List<DeviceLogEntity>> partitionlogs = ListUtil.partition(deviceLogList, 100);
for (List<DeviceLogEntity> deviceLogEntities : partitionlogs) { for (List<DeviceLogEntity> deviceLogEntities : partitionlogs) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment