Commit 1c2f4fab authored by 赵啸非's avatar 赵啸非

修改设备更新线程实现逻辑

parent 0c5faa7b
...@@ -82,14 +82,16 @@ public class DirectDynamicListener implements MessageListener { ...@@ -82,14 +82,16 @@ public class DirectDynamicListener implements MessageListener {
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) {
deviceEntity.setOnlineTime(new Date()); deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
DeviceEntity entity = new DeviceEntity(); // DeviceEntity entity = new DeviceEntity();
entity.setOnlineTime(new Date()); // entity.setOnlineTime(new Date());
entity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); // entity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
DeviceEntity condition = new DeviceEntity(); // DeviceEntity condition = new DeviceEntity();
condition.setId(deviceEntity.getId()); // condition.setId(deviceEntity.getId());
int update = deviceService.getDeviceDao().update(entity, condition); //int update = deviceService.getDeviceDao().update(entity, condition);
// deviceService.update(deviceEntity); // deviceService.update(deviceEntity);
deviceService.putCache(deviceEntity.getId().toString(),deviceEntity); //deviceService.putCache(deviceEntity.getId().toString(),deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.get(deviceEntity.getProductId()); ProductEntity productEntity = productService.get(deviceEntity.getProductId());
......
...@@ -249,7 +249,7 @@ public class DeviceApiController { ...@@ -249,7 +249,7 @@ public class DeviceApiController {
entity.setId(deviceEntity.getId()); entity.setId(deviceEntity.getId());
entity.setOnlineTime(new Date()); entity.setOnlineTime(new Date());
entity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); entity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
Long lpush = cacheService.lpush(RedisKey.KEY_DEVICE_ONLINE_QUEUE, entity); cacheService.lpush(RedisKey.KEY_DEVICE_ONLINE_QUEUE, entity);
//deviceService.getDeviceDao().update(deviceEntity); //deviceService.getDeviceDao().update(deviceEntity);
rsp.setData(deviceResp); rsp.setData(deviceResp);
...@@ -441,7 +441,8 @@ public class DeviceApiController { ...@@ -441,7 +441,8 @@ public class DeviceApiController {
updateDevice(req, platformEntity, productEntity, deviceEntity); updateDevice(req, platformEntity, productEntity, deviceEntity);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity, null); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity, null);
} }
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.未激活.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.未激活.getValue()) {
...@@ -927,7 +928,9 @@ public class DeviceApiController { ...@@ -927,7 +928,9 @@ public class DeviceApiController {
deviceEntity.setLeadingOfficial(req.getLeadingOfficial()); deviceEntity.setLeadingOfficial(req.getLeadingOfficial());
deviceEntity.setDeviceInFloor(req.getDeviceInFloor()); deviceEntity.setDeviceInFloor(req.getDeviceInFloor());
deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding()); deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding());
deviceService.update(deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
...@@ -951,7 +954,8 @@ public class DeviceApiController { ...@@ -951,7 +954,8 @@ public class DeviceApiController {
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
...@@ -965,7 +969,8 @@ public class DeviceApiController { ...@@ -965,7 +969,8 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
private void deviceStop(DeviceReq req) throws AppException { private void deviceStop(DeviceReq req) throws AppException {
...@@ -977,7 +982,8 @@ public class DeviceApiController { ...@@ -977,7 +982,8 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
private DeviceEntity checkDeviceExist(DeviceReq req) { private DeviceEntity checkDeviceExist(DeviceReq req) {
......
...@@ -20,6 +20,12 @@ public class RedisKey { ...@@ -20,6 +20,12 @@ public class RedisKey {
*/ */
public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue"; public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue";
/**
* 设备更新队列
*/
public static final String KEY_DEVICE_UPDATE_QUEUE = "device:update:queue";
/** /**
* 设备通知外部更新队列 * 设备通知外部更新队列
*/ */
......
...@@ -10,6 +10,7 @@ import com.mortals.xhx.module.device.model.DeviceEntity; ...@@ -10,6 +10,7 @@ import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService; import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.service.PlatformService; import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.service.ProductService; import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.thread.DeviceUpdateComsumerThread;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -40,11 +41,15 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic ...@@ -40,11 +41,15 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic
private SendTaskThreadPool sendTaskThreadPool; private SendTaskThreadPool sendTaskThreadPool;
@Autowired @Autowired
private DirectDynamicListener directDynamicListener; private DirectDynamicListener directDynamicListener;
@Autowired
private DeviceUpdateComsumerThread deviceUpdateComsumerThread;
@Override @Override
public void start() { public void start() {
log.info("初始化发送线程数量"); log.info("初始化发送线程数量");
sendTaskThreadPool.init(20); sendTaskThreadPool.init(20);
log.info("设备更新消费线程启动");
deviceUpdateComsumerThread.start();
log.info("服务端消息队列初始化服务开始.."); log.info("服务端消息队列初始化服务开始..");
deviceService.find(new DeviceEntity()) deviceService.find(new DeviceEntity())
......
...@@ -173,7 +173,9 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -173,7 +173,9 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) {
deviceEntity.setOnlineTime(new Date()); deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
deviceService.update(deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
// deviceService.update(deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString());
......
...@@ -30,7 +30,7 @@ import java.util.List; ...@@ -30,7 +30,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 设备状态修正 * 设备状态修正 todo 修改更新设备信息在单一线程中
*/ */
@Slf4j @Slf4j
@Service("DeviceStatTask") @Service("DeviceStatTask")
...@@ -51,7 +51,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -51,7 +51,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
log.info("设备状态修正任务"); log.info("设备状态修正任务");
List<DeviceEntity> waitDeviceInfos = new ArrayList<>(); List<DeviceEntity> waitDeviceInfos = new ArrayList<>();
while (true) { while (true) {
DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_ONLINE_QUEUE, DeviceEntity.class); DeviceEntity deviceEntity = cacheService.blpop(RedisKey.KEY_DEVICE_ONLINE_QUEUE,10, DeviceEntity.class);
if (ObjectUtils.isEmpty(deviceEntity)) { if (ObjectUtils.isEmpty(deviceEntity)) {
log.info("deviceEntity:{}",deviceEntity==null); log.info("deviceEntity:{}",deviceEntity==null);
break; break;
...@@ -64,10 +64,12 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -64,10 +64,12 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
log.error("线程休眠异常!",e); log.error("线程休眠异常!",e);
} }
} }
log.info("waitDeviceInfos size:{}", waitDeviceInfos.size()); log.info("waitDeviceInfos size:{}", waitDeviceInfos.size());
if (!ObjectUtils.isEmpty(waitDeviceInfos)) { if (!ObjectUtils.isEmpty(waitDeviceInfos)) {
deviceService.update(waitDeviceInfos); for (DeviceEntity waitDeviceInfo : waitDeviceInfos) {
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,waitDeviceInfo);
}
// deviceService.update(waitDeviceInfos);
} }
//获取所有在线设备 //获取所有在线设备
List<DeviceEntity> deviceList = deviceService.find(new DeviceQuery().deviceStatus(DeviceStatusEnum.在线.getValue())); List<DeviceEntity> deviceList = deviceService.find(new DeviceQuery().deviceStatus(DeviceStatusEnum.在线.getValue()));
...@@ -85,8 +87,11 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -85,8 +87,11 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
if (!ObjectUtils.isEmpty(deviceOfflineList)) { if (!ObjectUtils.isEmpty(deviceOfflineList)) {
log.info("修正设备数量:size:{}", deviceOfflineList.size()); log.info("修正设备数量:size:{}", deviceOfflineList.size());
deviceService.update(deviceOfflineList);
// deviceService.update(deviceOfflineList);
deviceOfflineList.forEach(deviceEntity -> { deviceOfflineList.forEach(deviceEntity -> {
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) { if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
......
...@@ -25,9 +25,8 @@ public class DeviceAlarmInfoEntity extends DeviceAlarmInfoVo { ...@@ -25,9 +25,8 @@ public class DeviceAlarmInfoEntity extends DeviceAlarmInfoVo {
*/ */
private Long alarmDevice; private Long alarmDevice;
/** /**
* 告警类型,(0.离线) * 告警类型(0.离线)
*/ */
@Excel(name = "告警类型,", readConverterExp = "告警类型,(0.离线)")
private Integer alarmType; private Integer alarmType;
/** /**
* 告警级别(0.危险,1.次要,2.一般) * 告警级别(0.危险,1.次要,2.一般)
......
package com.mortals.xhx.thread;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 设备消息更新线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 1)
public class DeviceUpdateComsumerThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private DeviceService deviceService;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() throws Exception {
log.info("update device thread process");
List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>();
while (true) {
DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_UPDATE_QUEUE, DeviceEntity.class);
if (ObjectUtils.isEmpty(deviceEntity)) {
log.info("deviceEntity:{}", deviceEntity == null);
break;
} else {
waitUpdateDeviceList.add(deviceEntity);
}
}
if (!ObjectUtils.isEmpty(waitUpdateDeviceList)) {
log.info("updateDeviceList size:{}", waitUpdateDeviceList.size());
deviceService.update(waitUpdateDeviceList);
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment