Commit 05536e97 authored by 赵啸非's avatar 赵啸非

修改设备更新线程实现逻辑

parent 2556a269
...@@ -11,6 +11,7 @@ import com.mortals.xhx.module.device.service.DeviceService; ...@@ -11,6 +11,7 @@ import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.service.PlatformService; import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.service.ProductService; import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.thread.DeviceUpdateComsumerThread; import com.mortals.xhx.thread.DeviceUpdateComsumerThread;
import com.mortals.xhx.thread.SendThirdPartyThread;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -43,6 +44,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic ...@@ -43,6 +44,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic
private DirectDynamicListener directDynamicListener; private DirectDynamicListener directDynamicListener;
@Autowired @Autowired
private DeviceUpdateComsumerThread deviceUpdateComsumerThread; private DeviceUpdateComsumerThread deviceUpdateComsumerThread;
@Autowired
private SendThirdPartyThread sendThirdPartyThread;
@Override @Override
public void start() { public void start() {
...@@ -50,6 +53,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic ...@@ -50,6 +53,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic
sendTaskThreadPool.init(20); sendTaskThreadPool.init(20);
log.info("设备更新消费线程启动"); log.info("设备更新消费线程启动");
deviceUpdateComsumerThread.start(); deviceUpdateComsumerThread.start();
log.info("设备更新消费线程启动");
sendThirdPartyThread.start();
log.info("服务端消息队列初始化服务开始.."); log.info("服务端消息队列初始化服务开始..");
deviceService.find(new DeviceEntity()) deviceService.find(new DeviceEntity())
......
...@@ -20,7 +20,6 @@ import com.mortals.xhx.queue.TbQueueMsg; ...@@ -20,7 +20,6 @@ import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders; import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.Date; import java.util.Date;
...@@ -33,7 +32,7 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; ...@@ -33,7 +32,7 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
* @author: zxfei * @author: zxfei
* @date: 2023/11/8 23:45 * @date: 2023/11/8 23:45
*/ */
@Component //@Component
@Slf4j @Slf4j
public class DeviceDownMsgService implements IApplicationStartedService { public class DeviceDownMsgService implements IApplicationStartedService {
......
...@@ -11,14 +11,13 @@ import com.mortals.xhx.common.pdu.DeviceReq; ...@@ -11,14 +11,13 @@ import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL; import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL; import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE; import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
@Component //@Component
@Slf4j @Slf4j
public class DeviceSendThirdPartyService implements IApplicationStartedService { public class DeviceSendThirdPartyService implements IApplicationStartedService {
......
package com.mortals.xhx.thread;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.base.system.message.impl.MessageProducer;
import com.mortals.xhx.busiz.req.DeviceMsgReq;
import com.mortals.xhx.common.code.LogTypeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
/**
* 发送第三方服务线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 3)
public class DeviceDownMsgThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceLogService deviceLogService;
@Autowired
private MessageProducer messageProducer;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() {
log.info("update device thread process");
DeviceMsgReq deviceMsgReq = cacheService.blpop(KEY_DEVICE_DOWN_MSG_QUEUE, 10, DeviceMsgReq.class);
if (!ObjectUtils.isEmpty(deviceMsgReq)) {
log.info("下发消息:{}", deviceMsgReq.getDeviceCode());
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode());
} catch (Exception e) {
log.error("redis 获取设备异常!", e);
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType());
header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header);
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType());
deviceLogEntity.setContent(deviceMsgReq.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", deviceMsgReq.getDeviceCode());
}
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
...@@ -37,7 +37,7 @@ public class DeviceUpdateComsumerThread extends AbstractThread { ...@@ -37,7 +37,7 @@ public class DeviceUpdateComsumerThread extends AbstractThread {
} }
@Override @Override
protected void process() throws Exception { protected void process() {
log.info("update device thread process"); log.info("update device thread process");
List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>(); List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>();
while (true) { while (true) {
......
package com.mortals.xhx.thread;
import cn.hutool.core.net.url.UrlBuilder;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
/**
* 发送第三方服务线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 2)
public class SendThirdPartyThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private MessageService messageService;
@Value("${thirdPartyPath:/inter/device/deviceIn}")
public String thirdPartyPath;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() {
log.info("SendThirdPartyThread process");
DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE, 10, DeviceReq.class);
if (!ObjectUtils.isEmpty(deviceReq)) {
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://127.0.0.1:11078/zwfw_api");
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment