From 05536e97f00faf26410f60aabdf951d34a21daa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E5=95=B8=E9=9D=9E?= <8153694@qq.com> Date: Tue, 12 Dec 2023 14:52:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=AE=BE=E5=A4=87=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E7=BA=BF=E7=A8=8B=E5=AE=9E=E7=8E=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DeviceComsumersRegisterService.java | 5 + .../DeviceDownMsgService.java | 3 +- .../DeviceSendThirdPartyService.java | 3 +- .../xhx/thread/DeviceDownMsgThread.java | 107 ++++++++++++++++++ .../thread/DeviceUpdateComsumerThread.java | 2 +- .../xhx/thread/SendThirdPartyThread.java | 65 +++++++++++ 6 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 device-manager/src/main/java/com/mortals/xhx/thread/DeviceDownMsgThread.java create mode 100644 device-manager/src/main/java/com/mortals/xhx/thread/SendThirdPartyThread.java diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceComsumersRegisterService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceComsumersRegisterService.java index 2fe53edd..81edef02 100644 --- a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceComsumersRegisterService.java +++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceComsumersRegisterService.java @@ -11,6 +11,7 @@ import com.mortals.xhx.module.device.service.DeviceService; import com.mortals.xhx.module.platform.service.PlatformService; import com.mortals.xhx.module.product.service.ProductService; import com.mortals.xhx.thread.DeviceUpdateComsumerThread; +import com.mortals.xhx.thread.SendThirdPartyThread; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; @@ -43,6 +44,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic private DirectDynamicListener directDynamicListener; @Autowired private DeviceUpdateComsumerThread deviceUpdateComsumerThread; + @Autowired + private SendThirdPartyThread sendThirdPartyThread; @Override public void start() { @@ -50,6 +53,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic sendTaskThreadPool.init(20); log.info("璁惧鏇存柊娑堣垂绾跨▼鍚姩"); deviceUpdateComsumerThread.start(); + log.info("璁惧鏇存柊娑堣垂绾跨▼鍚姩"); + sendThirdPartyThread.start(); log.info("鏈嶅姟绔秷鎭槦鍒楀垵濮嬪寲鏈嶅姟寮€濮�.."); deviceService.find(new DeviceEntity()) diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java index f7a80d85..3933e00d 100644 --- a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java +++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceDownMsgService.java @@ -20,7 +20,6 @@ import com.mortals.xhx.queue.TbQueueMsg; import com.mortals.xhx.queue.TbQueueMsgHeaders; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.Date; @@ -33,7 +32,7 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; * @author: zxfei * @date: 2023/11/8 23:45 */ -@Component +//@Component @Slf4j public class DeviceDownMsgService implements IApplicationStartedService { diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java index bdae4528..2bf7f30a 100644 --- a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java +++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceSendThirdPartyService.java @@ -11,14 +11,13 @@ import com.mortals.xhx.common.pdu.DeviceReq; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL; import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL; import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE; -@Component +//@Component @Slf4j public class DeviceSendThirdPartyService implements IApplicationStartedService { diff --git a/device-manager/src/main/java/com/mortals/xhx/thread/DeviceDownMsgThread.java b/device-manager/src/main/java/com/mortals/xhx/thread/DeviceDownMsgThread.java new file mode 100644 index 00000000..0e9b383c --- /dev/null +++ b/device-manager/src/main/java/com/mortals/xhx/thread/DeviceDownMsgThread.java @@ -0,0 +1,107 @@ +package com.mortals.xhx.thread; + + +import cn.hutool.core.util.IdUtil; +import com.alibaba.fastjson.JSON; +import com.mortals.framework.service.ICacheService; +import com.mortals.framework.util.AbstractThread; +import com.mortals.xhx.base.system.message.impl.MessageProducer; +import com.mortals.xhx.busiz.req.DeviceMsgReq; +import com.mortals.xhx.common.code.LogTypeEnum; +import com.mortals.xhx.common.key.Constant; +import com.mortals.xhx.common.key.QueueKey; +import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders; +import com.mortals.xhx.common.model.MessageHeader; +import com.mortals.xhx.module.device.model.DeviceEntity; +import com.mortals.xhx.module.device.model.DeviceLogEntity; +import com.mortals.xhx.module.device.service.DeviceLogService; +import com.mortals.xhx.module.device.service.DeviceService; +import com.mortals.xhx.queue.DefaultTbQueueMsg; +import com.mortals.xhx.queue.TbQueueMsg; +import com.mortals.xhx.queue.TbQueueMsgHeaders; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; + +import java.util.Date; + +import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; + + +/** + * 鍙戦€佺涓夋柟鏈嶅姟绾跨▼ + * + * @author zxfei + */ +@Slf4j +@Service +@Order(value = 3) +public class DeviceDownMsgThread extends AbstractThread { + + @Autowired + private ICacheService cacheService; + @Autowired + private DeviceService deviceService; + @Autowired + private DeviceLogService deviceLogService; + @Autowired + private MessageProducer messageProducer; + + + @Override + protected int getSleepTime() { + return 1000; + } + + @Override + protected void process() { + log.info("update device thread process"); + DeviceMsgReq deviceMsgReq = cacheService.blpop(KEY_DEVICE_DOWN_MSG_QUEUE, 10, DeviceMsgReq.class); + if (!ObjectUtils.isEmpty(deviceMsgReq)) { + log.info("涓嬪彂娑堟伅:{}", deviceMsgReq.getDeviceCode()); + //鏍规嵁璁惧缂栫爜鏌ヨ璁惧 + DeviceEntity deviceEntity = null; + try { + deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode()); + } catch (Exception e) { + log.error("redis 鑾峰彇璁惧寮傚父锛�", e); + } + if (!ObjectUtils.isEmpty(deviceEntity)) { + TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders(); + header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType()); + header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode()); + header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString()); + + TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header); + messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg)); + + DeviceLogEntity deviceLogEntity = new DeviceLogEntity(); + deviceLogEntity.initAttrValue(); + deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID()); + deviceLogEntity.setSiteId(deviceEntity.getSiteId()); + deviceLogEntity.setDeviceId(deviceEntity.getId()); + deviceLogEntity.setDeviceName(deviceEntity.getDeviceName()); + deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode()); + deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType()); + deviceLogEntity.setContent(deviceMsgReq.getData()); + deviceLogEntity.setLogType(LogTypeEnum.涓嬪彂鏈嶅姟.getValue()); + deviceLogEntity.setCreateUserId(1L); + deviceLogEntity.setCreateTime(new Date()); + deviceLogService.save(deviceLogEntity, null); + } else { + log.info("鏈壘鍒拌澶囷紝deviceCode:{}", deviceMsgReq.getDeviceCode()); + } + } + } + + + @Override + protected void threadClosed() { + // TODO Auto-generated method stub + super.close(); + } + + +} diff --git a/device-manager/src/main/java/com/mortals/xhx/thread/DeviceUpdateComsumerThread.java b/device-manager/src/main/java/com/mortals/xhx/thread/DeviceUpdateComsumerThread.java index 52c69c58..480e5a86 100644 --- a/device-manager/src/main/java/com/mortals/xhx/thread/DeviceUpdateComsumerThread.java +++ b/device-manager/src/main/java/com/mortals/xhx/thread/DeviceUpdateComsumerThread.java @@ -37,7 +37,7 @@ public class DeviceUpdateComsumerThread extends AbstractThread { } @Override - protected void process() throws Exception { + protected void process() { log.info("update device thread process"); List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>(); while (true) { diff --git a/device-manager/src/main/java/com/mortals/xhx/thread/SendThirdPartyThread.java b/device-manager/src/main/java/com/mortals/xhx/thread/SendThirdPartyThread.java new file mode 100644 index 00000000..9d8b39eb --- /dev/null +++ b/device-manager/src/main/java/com/mortals/xhx/thread/SendThirdPartyThread.java @@ -0,0 +1,65 @@ +package com.mortals.xhx.thread; + + +import cn.hutool.core.net.url.UrlBuilder; +import com.alibaba.fastjson.JSON; +import com.mortals.framework.ap.GlobalSysInfo; +import com.mortals.framework.service.ICacheService; +import com.mortals.framework.util.AbstractThread; +import com.mortals.xhx.base.system.message.MessageService; +import com.mortals.xhx.busiz.rsp.ApiResp; +import com.mortals.xhx.common.pdu.DeviceReq; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; + +import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL; +import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE; + + +/** + * 鍙戦€佺涓夋柟鏈嶅姟绾跨▼ + * + * @author zxfei + */ +@Slf4j +@Service +@Order(value = 2) +public class SendThirdPartyThread extends AbstractThread { + + @Autowired + private ICacheService cacheService; + @Autowired + private MessageService messageService; + + @Value("${thirdPartyPath:/inter/device/deviceIn}") + public String thirdPartyPath; + + @Override + protected int getSleepTime() { + return 1000; + } + + @Override + protected void process() { + log.info("SendThirdPartyThread process"); + DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE, 10, DeviceReq.class); + if (!ObjectUtils.isEmpty(deviceReq)) { + String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://127.0.0.1:11078/zwfw_api"); + ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq); + log.info("sendThirty resp ==>{}", JSON.toJSONString(resp)); + } + } + + + @Override + protected void threadClosed() { + // TODO Auto-generated method stub + super.close(); + } + + +} -- 2.24.3