Commit 06c3e44c authored by 赵啸非's avatar 赵啸非

优化设备管理系统callback 下发消息效率

parent 360d0cc4
......@@ -77,6 +77,7 @@ import java.util.stream.Collectors;
import static com.mortals.xhx.common.key.Constant.*;
import static com.mortals.xhx.common.key.ErrorCode.*;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
import static com.mortals.xhx.common.key.RedisKey.KEY_TOKEN_API_CACHE;
/**
......@@ -666,8 +667,10 @@ public class DeviceApiController {
rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
try {
DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);
List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList());
cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE,collect);
// DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
// sendTaskThreadPool.execute(downMsgTask);
} catch (AppException e) {
log.error("接收数据失败", e);
rsp.setCode(e.getCode());
......@@ -680,7 +683,7 @@ public class DeviceApiController {
return JSON.toJSONString(rsp);
}
log.debug("响应【设备数据消息接收】【响应体】--> " + JSONObject.toJSONString(rsp));
log.info("响应【设备数据消息接收】【响应体】--> " + JSONObject.toJSONString(rsp));
return JSON.toJSONString(rsp);
}
......
......@@ -25,6 +25,11 @@ public class RedisKey {
*/
public static final String KEY_DEVICE_THIRDPARTY_QUEUE = "device:thirdparty:queue";
/**
* 下发消息队列
*/
public static final String KEY_DEVICE_DOWN_MSG_QUEUE = "device:down:msg:queue";
public static final String KEY_SITE_CACHE = "siteDict";
public static final String KEY_PLATFORM_CACHE = "platformDict";
......
package com.mortals.xhx.daemon.applicationservice;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.system.message.impl.MessageProducer;
import com.mortals.xhx.busiz.req.DeviceMsgReq;
import com.mortals.xhx.common.code.LogTypeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
/**
* 下发消息
*
* @author: zxfei
* @date: 2023/11/8 23:45
*/
@Component
@Slf4j
public class DeviceDownMsgService implements IApplicationStartedService {
protected Boolean stopped = false;
@Autowired
private ICacheService cacheService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceLogService deviceLogService;
@Autowired
private MessageProducer messageProducer;
@Override
public void start() {
log.info("DeviceDownMsgService start");
Thread sendThread = new Thread(new Runnable() {
@Override
public void run() {
int waitTime = 1;
while (!stopped) {
try {
DeviceMsgReq deviceMsgReq = cacheService.lpop(KEY_DEVICE_DOWN_MSG_QUEUE, DeviceMsgReq.class);
if (!ObjectUtils.isEmpty(deviceMsgReq)) {
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode());
} catch (Exception e) {
log.error("redis 获取设备异常!", e);
continue;
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType());
header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header);
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType());
deviceLogEntity.setContent(deviceMsgReq.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", deviceMsgReq.getDeviceCode());
}
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
} catch (Exception e) {
log.error("异常", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
}
}
}
});
sendThread.start();
}
@Override
public void stop() {
log.info("停止服务..");
this.stopped = true;
}
@Override
public int getOrder() {
return 50;
}
}
......@@ -8,7 +8,6 @@ import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -23,9 +22,6 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
@Slf4j
public class DeviceSendThirdPartyService implements IApplicationStartedService {
@Autowired
private SendTaskThreadPool sendTaskThreadPool;
protected Boolean stopped = false;
@Autowired
private ICacheService cacheService;
......@@ -73,33 +69,6 @@ public class DeviceSendThirdPartyService implements IApplicationStartedService {
sendThread.start();
//启动短信发送响应更新线程
/* sendTaskThreadPool.execute(() -> {
int waitTime = SEND_INTEVEL;
while (!stopped) {
try {
// DeviceReq deviceReq = cacheService.lpop(KEY_DEVICE_THIRDPARTY_QUEUE, DeviceReq.class);
// if (!ObjectUtils.isEmpty(deviceReq)) {
// String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090");
// ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
// log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
// }
// try {
// Thread.sleep(waitTime);
// } catch (InterruptedException e2) {
// }
} catch (Exception e) {
log.error("异常", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
}
}
});*/
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment