Commit 57e45695 authored by 赵啸非's avatar 赵啸非

解决设备下发消息

parent f4a43c8a
...@@ -113,7 +113,7 @@ public class DirectDynamicListener implements MessageListener { ...@@ -113,7 +113,7 @@ public class DirectDynamicListener implements MessageListener {
bizLogPdu.setLogTime(new Date()); bizLogPdu.setLogTime(new Date());
messageProducer.syncBizSend(bizLogPdu); messageProducer.syncBizSend(bizLogPdu);
} }
log.info("id:{},deviceCode:{} deviceStatus:{}==>上线 ", deviceEntity.getId(), deviceEntity.getDeviceCode(), deviceEntity.getDeviceStatus()); // log.info("id:{},deviceCode:{} deviceStatus:{}==>上线 ", deviceEntity.getId(), deviceEntity.getDeviceCode(), deviceEntity.getDeviceStatus());
} }
if (!Constant.MESSAGETYPE_HEARTBEAT.equalsIgnoreCase(messageType)) { if (!Constant.MESSAGETYPE_HEARTBEAT.equalsIgnoreCase(messageType)) {
DeviceLogEntity deviceLogEntity = new DeviceLogEntity(); DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
......
...@@ -77,6 +77,7 @@ import java.util.stream.Collectors; ...@@ -77,6 +77,7 @@ import java.util.stream.Collectors;
import static com.mortals.xhx.common.key.Constant.*; import static com.mortals.xhx.common.key.Constant.*;
import static com.mortals.xhx.common.key.ErrorCode.*; import static com.mortals.xhx.common.key.ErrorCode.*;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
import static com.mortals.xhx.common.key.RedisKey.KEY_TOKEN_API_CACHE; import static com.mortals.xhx.common.key.RedisKey.KEY_TOKEN_API_CACHE;
/** /**
...@@ -185,7 +186,7 @@ public class DeviceApiController { ...@@ -185,7 +186,7 @@ public class DeviceApiController {
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString());
if (ObjectUtils.isEmpty(productEntity)) { if (ObjectUtils.isEmpty(productEntity)) {
throw new AppException(PRODUCT_IS_EMPTY, PRODUCT_IS_EMPTY_CONTENT+"productId:"+deviceEntity.getProductId()); throw new AppException(PRODUCT_IS_EMPTY, PRODUCT_IS_EMPTY_CONTENT + "productId:" + deviceEntity.getProductId());
} }
if (ObjectUtils.isEmpty(productEntity.getPlatformId())) { if (ObjectUtils.isEmpty(productEntity.getPlatformId())) {
...@@ -440,7 +441,7 @@ public class DeviceApiController { ...@@ -440,7 +441,7 @@ public class DeviceApiController {
updateDevice(req, platformEntity, productEntity, deviceEntity); updateDevice(req, platformEntity, productEntity, deviceEntity);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE, deviceEntity);
//deviceService.update(deviceEntity, null); //deviceService.update(deviceEntity, null);
} }
...@@ -667,12 +668,13 @@ public class DeviceApiController { ...@@ -667,12 +668,13 @@ public class DeviceApiController {
rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel()); rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue()); rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
try { try {
List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList()); list.stream()
/* collect.forEach(item->{ .sorted(Comparator.comparing(DeviceMsgReq::getTimestamp))
cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE,item); .forEach(item -> {
});*/ cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE, item);
DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer); });
sendTaskThreadPool.execute(downMsgTask); /* DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);*/
} catch (AppException e) { } catch (AppException e) {
log.error("接收数据失败", e); log.error("接收数据失败", e);
rsp.setCode(e.getCode()); rsp.setCode(e.getCode());
...@@ -928,7 +930,7 @@ public class DeviceApiController { ...@@ -928,7 +930,7 @@ public class DeviceApiController {
deviceEntity.setDeviceInFloor(req.getDeviceInFloor()); deviceEntity.setDeviceInFloor(req.getDeviceInFloor());
deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding()); deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding());
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE, deviceEntity);
//deviceService.update(deviceEntity); //deviceService.update(deviceEntity);
} }
...@@ -953,7 +955,7 @@ public class DeviceApiController { ...@@ -953,7 +955,7 @@ public class DeviceApiController {
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE, deviceEntity);
//deviceService.update(deviceEntity); //deviceService.update(deviceEntity);
} }
...@@ -968,7 +970,7 @@ public class DeviceApiController { ...@@ -968,7 +970,7 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE, deviceEntity);
//deviceService.update(deviceEntity); //deviceService.update(deviceEntity);
} }
...@@ -981,7 +983,7 @@ public class DeviceApiController { ...@@ -981,7 +983,7 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE, deviceEntity);
//deviceService.update(deviceEntity); //deviceService.update(deviceEntity);
} }
......
...@@ -25,7 +25,9 @@ import org.springframework.core.annotation.Order; ...@@ -25,7 +25,9 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
...@@ -58,40 +60,53 @@ public class DeviceDownMsgThread extends AbstractThread { ...@@ -58,40 +60,53 @@ public class DeviceDownMsgThread extends AbstractThread {
@Override @Override
protected void process() { protected void process() {
log.info("DeviceDownMsgThread process"); log.info("DeviceDownMsgThread process");
DeviceMsgReq deviceMsgReq = cacheService.blpop(KEY_DEVICE_DOWN_MSG_QUEUE, 10, DeviceMsgReq.class); List<DeviceMsgReq> deviceMsgReqs = new ArrayList<>();
if (!ObjectUtils.isEmpty(deviceMsgReq)) { while (true) {
log.info("下发消息:{}", deviceMsgReq.getDeviceCode()); DeviceMsgReq deviceMsgReq = cacheService.blpop(KEY_DEVICE_DOWN_MSG_QUEUE, 10, DeviceMsgReq.class);
//根据设备编码查询设备 if (ObjectUtils.isEmpty(deviceMsgReq)) {
DeviceEntity deviceEntity = null; log.info("deviceMsgReq:{}", deviceMsgReq == null);
try { break;
deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode());
} catch (Exception e) {
log.error("redis 获取设备异常!", e);
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType());
header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header);
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType());
deviceLogEntity.setContent(deviceMsgReq.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity, null);
} else { } else {
log.info("未找到设备,deviceCode:{}", deviceMsgReq.getDeviceCode()); deviceMsgReqs.add(deviceMsgReq);
}
}
if (!ObjectUtils.isEmpty(deviceMsgReqs)) {
log.info("消息下发数量:{}", deviceMsgReqs.size());
for (DeviceMsgReq item : deviceMsgReqs) {
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = deviceService.getExtCache(item.getDeviceCode());
} catch (Exception e) {
log.info("deviceCode:{}", item.getDeviceCode());
log.error("redis 获取设备异常!", e);
continue;
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, item.getMessageType());
header.put(MessageHeader.DEVICECODE, item.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header);
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(item.getMessageType());
deviceLogEntity.setContent(item.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", item.getDeviceCode());
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment