diff --git a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java index 820b6d2a71d994f47ac31a7b410c4f79911d4a40..a8d85a9bf4491f56ba75a124ba1bedf3fbc680d4 100644 --- a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java +++ b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java @@ -77,6 +77,7 @@ import java.util.stream.Collectors; import static com.mortals.xhx.common.key.Constant.*; import static com.mortals.xhx.common.key.ErrorCode.*; +import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; import static com.mortals.xhx.common.key.RedisKey.KEY_TOKEN_API_CACHE; /** @@ -666,8 +667,10 @@ public class DeviceApiController { rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel()); rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue()); try { - DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer); - sendTaskThreadPool.execute(downMsgTask); + List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList()); + cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE,collect); + // DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer); + // sendTaskThreadPool.execute(downMsgTask); } catch (AppException e) { log.error("鎺ユ敹鏁版嵁澶辫触", e); rsp.setCode(e.getCode()); @@ -680,7 +683,7 @@ public class DeviceApiController { return JSON.toJSONString(rsp); } - log.debug("鍝嶅簲銆愯澶囨暟鎹秷鎭帴鏀躲€戙€愬搷搴斾綋銆�--> " + JSONObject.toJSONString(rsp)); + log.info("鍝嶅簲銆愯澶囨暟鎹秷鎭帴鏀躲€戙€愬搷搴斾綋銆�--> " + JSONObject.toJSONString(rsp)); return JSON.toJSONString(rsp); } diff --git a/device-manager/src/main/java/com/mortals/xhx/common/key/RedisKey.java b/device-manager/src/main/java/com/mortals/xhx/common/key/RedisKey.java index 23a503b69a4adb857c28e7e8e51b0069eba62c95..9d37d4287975e1dbde6ed0deced03951b5ac5f3f 100644 --- a/device-manager/src/main/java/com/mortals/xhx/common/key/RedisKey.java +++ b/device-manager/src/main/java/com/mortals/xhx/common/key/RedisKey.java @@ -25,6 +25,11 @@ public class RedisKey { */ public static final String KEY_DEVICE_THIRDPARTY_QUEUE = "device:thirdparty:queue"; + /** + * 涓嬪彂娑堟伅闃熷垪 + */ + public static final String KEY_DEVICE_DOWN_MSG_QUEUE = "device:down:msg:queue"; + public static final String KEY_SITE_CACHE = "siteDict"; public static final String KEY_PLATFORM_CACHE = "platformDict"; diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java new file mode 100644 index 0000000000000000000000000000000000000000..a0a6382c5710de9e0c97c10252ebb2d7807eb351 --- /dev/null +++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java @@ -0,0 +1,128 @@ +package com.mortals.xhx.daemon.applicationservice; + +import cn.hutool.core.util.IdUtil; +import com.alibaba.fastjson.JSON; +import com.mortals.framework.service.ICacheService; +import com.mortals.framework.springcloud.service.IApplicationStartedService; +import com.mortals.xhx.base.system.message.impl.MessageProducer; +import com.mortals.xhx.busiz.req.DeviceMsgReq; +import com.mortals.xhx.common.code.LogTypeEnum; +import com.mortals.xhx.common.key.Constant; +import com.mortals.xhx.common.key.QueueKey; +import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders; +import com.mortals.xhx.common.model.MessageHeader; +import com.mortals.xhx.module.device.model.DeviceEntity; +import com.mortals.xhx.module.device.model.DeviceLogEntity; +import com.mortals.xhx.module.device.service.DeviceLogService; +import com.mortals.xhx.module.device.service.DeviceService; +import com.mortals.xhx.queue.DefaultTbQueueMsg; +import com.mortals.xhx.queue.TbQueueMsg; +import com.mortals.xhx.queue.TbQueueMsgHeaders; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.Date; + +import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; + +/** + * 涓嬪彂娑堟伅 + * + * @author: zxfei + * @date: 2023/11/8 23:45 + */ +@Component +@Slf4j +public class DeviceDownMsgService implements IApplicationStartedService { + + protected Boolean stopped = false; + @Autowired + private ICacheService cacheService; + @Autowired + private DeviceService deviceService; + @Autowired + private DeviceLogService deviceLogService; + @Autowired + private MessageProducer messageProducer; + + @Override + public void start() { + log.info("DeviceDownMsgService start"); + + Thread sendThread = new Thread(new Runnable() { + @Override + public void run() { + int waitTime = 1; + while (!stopped) { + try { + DeviceMsgReq deviceMsgReq = cacheService.lpop(KEY_DEVICE_DOWN_MSG_QUEUE, DeviceMsgReq.class); + if (!ObjectUtils.isEmpty(deviceMsgReq)) { + //鏍规嵁璁惧缂栫爜鏌ヨ璁惧 + DeviceEntity deviceEntity = null; + try { + deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode()); + } catch (Exception e) { + log.error("redis 鑾峰彇璁惧寮傚父锛�", e); + continue; + } + if (!ObjectUtils.isEmpty(deviceEntity)) { + TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders(); + header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType()); + header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode()); + header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString()); + + TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header); + messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg)); + + DeviceLogEntity deviceLogEntity = new DeviceLogEntity(); + deviceLogEntity.initAttrValue(); + deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID()); + deviceLogEntity.setSiteId(deviceEntity.getSiteId()); + deviceLogEntity.setDeviceId(deviceEntity.getId()); + deviceLogEntity.setDeviceName(deviceEntity.getDeviceName()); + deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode()); + deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType()); + deviceLogEntity.setContent(deviceMsgReq.getData()); + deviceLogEntity.setLogType(LogTypeEnum.涓嬪彂鏈嶅姟.getValue()); + deviceLogEntity.setCreateUserId(1L); + deviceLogEntity.setCreateTime(new Date()); + deviceLogService.save(deviceLogEntity, null); + } else { + log.info("鏈壘鍒拌澶囷紝deviceCode:{}", deviceMsgReq.getDeviceCode()); + } + } + + try { + Thread.sleep(waitTime); + } catch (InterruptedException e2) { + } + + } catch (Exception e) { + log.error("寮傚父", e); + + try { + Thread.sleep(waitTime); + } catch (InterruptedException e2) { + } + } + } + } + }); + + sendThread.start(); + } + + @Override + public void stop() { + log.info("鍋滄鏈嶅姟.."); + this.stopped = true; + } + + @Override + public int getOrder() { + return 50; + } + +} diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java index 5a957e3e9319ef808055089a78594eb47723e446..cae8771a68971c397235ac71507dde24f408708a 100644 --- a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java +++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java @@ -8,7 +8,6 @@ import com.mortals.framework.springcloud.service.IApplicationStartedService; import com.mortals.xhx.base.system.message.MessageService; import com.mortals.xhx.busiz.rsp.ApiResp; import com.mortals.xhx.common.pdu.DeviceReq; -import com.mortals.xhx.common.utils.SendTaskThreadPool; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -23,9 +22,6 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE; @Slf4j public class DeviceSendThirdPartyService implements IApplicationStartedService { - @Autowired - private SendTaskThreadPool sendTaskThreadPool; - protected Boolean stopped = false; @Autowired private ICacheService cacheService; @@ -73,33 +69,6 @@ public class DeviceSendThirdPartyService implements IApplicationStartedService { sendThread.start(); - //鍚姩鐭俊鍙戦€佸搷搴旀洿鏂扮嚎绋� - /* sendTaskThreadPool.execute(() -> { - int waitTime = SEND_INTEVEL; - while (!stopped) { - try { -// DeviceReq deviceReq = cacheService.lpop(KEY_DEVICE_THIRDPARTY_QUEUE, DeviceReq.class); -// if (!ObjectUtils.isEmpty(deviceReq)) { -// String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090"); -// ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq); -// log.info("sendThirty resp ==>{}", JSON.toJSONString(resp)); -// } - -// try { -// Thread.sleep(waitTime); -// } catch (InterruptedException e2) { -// } - - } catch (Exception e) { - log.error("寮傚父", e); - - try { - Thread.sleep(waitTime); - } catch (InterruptedException e2) { - } - } - } - });*/ }