Commit 839beab8 authored by 赵啸非's avatar 赵啸非

更新下发消息为线程池发送

parent 506e538e
......@@ -76,7 +76,6 @@ import java.util.stream.Collectors;
import static com.mortals.xhx.common.key.Constant.*;
import static com.mortals.xhx.common.key.ErrorCode.*;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
import static com.mortals.xhx.common.key.RedisKey.KEY_TOKEN_API_CACHE;
/**
......@@ -671,8 +670,11 @@ public class DeviceApiController {
.forEach(item -> {
cacheService.lpush(KEY_DEVICE_DOWN_MSG_QUEUE, item);
});*/
DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);
/* DownMsgTask downMsgTask = new DownMsgTask(list, platformService, productService, deviceService, deviceLogService, messageProducer);
sendTaskThreadPool.execute(downMsgTask);*/
deviceService.downMsg(list);
} catch (AppException e) {
log.error("接收数据失败", e);
rsp.setCode(e.getCode());
......
......@@ -3,6 +3,7 @@ package com.mortals.xhx.module.device.service;
import com.mortals.framework.common.Rest;
import com.mortals.framework.model.Context;
import com.mortals.framework.service.ICRUDCacheService;
import com.mortals.xhx.busiz.req.DeviceMsgReq;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.DeviceMethodEnum;
import com.mortals.xhx.module.device.dao.DeviceDao;
......@@ -73,5 +74,8 @@ public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
List<DeviceMapEntity> deviceMap(DeviceEntity query, Context context);
void downMsg(List<DeviceMsgReq> list);
}
\ No newline at end of file
package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.net.url.UrlPath;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
......@@ -12,6 +13,7 @@ import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.base.system.message.impl.MessageProducer;
import com.mortals.xhx.busiz.req.DeviceMsgReq;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
......@@ -44,6 +46,7 @@ import com.mortals.xhx.module.site.model.SiteEntity;
import com.mortals.xhx.module.site.service.SiteService;
import com.mortals.xhx.queue.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -502,6 +505,8 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
}
@Override
protected void removeBefore(Long[] ids, Context context) throws AppException {
Long[] removeIds = Arrays.asList(ids).stream().map(id -> {
......@@ -685,6 +690,71 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
}
}
@Override
public void downMsg(List<DeviceMsgReq> list) {
try {
//ArrayList<DeviceLogEntity> deviceLogList = new ArrayList<>();
List<DeviceMsgReq> collect = list.stream().sorted(Comparator.comparing(DeviceMsgReq::getTimestamp)).collect(Collectors.toList());
List<List<DeviceMsgReq>> partition = ListUtil.partition(collect, 10);
for (List<DeviceMsgReq> deviceMsgReqs : partition) {
for (DeviceMsgReq item : deviceMsgReqs) {
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = this.getExtCache(item.getDeviceCode());
} catch (Exception e) {
log.info("deviceCode:{}", item.getDeviceCode());
log.error("redis 获取设备异常!", e);
continue;
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, item.getMessageType());
header.put(MessageHeader.DEVICECODE, item.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(deviceEntity.getDeviceCode());
log.info("send rabbitmq msg:{}", item.getDeviceCode());
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg),correlationData);
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(item.getMessageType());
deviceLogEntity.setContent(item.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
// deviceLogList.add(deviceLogEntity);
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", item.getDeviceCode());
}
}
//TimeUnit.SECONDS.sleep(1);
}
/* if (!ObjectUtils.isEmpty(deviceLogList)) {
List<List<DeviceLogEntity>> partitionlogs = ListUtil.partition(deviceLogList, 100);
for (List<DeviceLogEntity> deviceLogEntities : partitionlogs) {
deviceLogService.save(deviceLogEntities);
}
}*/
} catch (Exception e) {
log.error("异常:", e);
}
}
public static void main(String[] args) {
BigDecimal bigDecimal = new BigDecimal("104.22241");
BigDecimal add = bigDecimal.add(new BigDecimal("0.01"));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment