Commit 78a2a99a authored by 赵啸非's avatar 赵啸非

更新下发消息为线程池发送

parent 6fd98cee
......@@ -667,13 +667,13 @@ public class DeviceApiController {
rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
try {
list.stream()
/* list.stream()
.sorted(Comparator.comparing(DeviceMsgReq::getTimestamp))
.forEach(item -> {
cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE, item);
});
/* DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);*/
});*/
DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);
} catch (AppException e) {
log.error("接收数据失败", e);
rsp.setCode(e.getCode());
......
......@@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
......@@ -49,6 +50,7 @@ public class DownMsgTask implements Runnable {
@Override
public void run() {
try {
ArrayList<DeviceLogEntity> deviceLogList = new ArrayList<>();
List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList());
List<List<DeviceMsgReq>> partition = ListUtil.partition(collect, 10);
for (List<DeviceMsgReq> deviceMsgReqs : partition) {
......@@ -83,6 +85,7 @@ public class DownMsgTask implements Runnable {
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
// deviceLogList.add(deviceLogEntity);
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", item.getDeviceCode());
......@@ -92,6 +95,13 @@ public class DownMsgTask implements Runnable {
TimeUnit.SECONDS.sleep(1);
}
/* if (!ObjectUtils.isEmpty(deviceLogList)) {
List<List<DeviceLogEntity>> partitionlogs = ListUtil.partition(deviceLogList, 100);
for (List<DeviceLogEntity> deviceLogEntities : partitionlogs) {
deviceLogService.save(deviceLogEntities);
}
}*/
} catch (Exception e) {
log.error("异常:", e);
}
......
......@@ -84,8 +84,8 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic
}
log.info("第三方发送线程启动");
sendThirdPartyThread.start();
log.info("消息下发发送线程启动");
deviceDownMsgThread.start();
//log.info("消息下发发送线程启动");
// deviceDownMsgThread.start();
log.info("服务端消息队列初始化服务开始..");
deviceService.find(new DeviceEntity())
......
......@@ -119,10 +119,6 @@ Content-Type: application/json
"deviceCode": "12-31-70-9C-BF-A3"
}
> {%
client.global.set("content", JSON.parse(response.body).data.content);
%}
###获取产品与
GET {{baseUrl}}/api/deviceInit
Accept: application/json
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment