Commit d919a15d authored by 王晓旭's avatar 王晓旭
parents c89c61cc 05536e97
...@@ -82,14 +82,16 @@ public class DirectDynamicListener implements MessageListener { ...@@ -82,14 +82,16 @@ public class DirectDynamicListener implements MessageListener {
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) {
deviceEntity.setOnlineTime(new Date()); deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
DeviceEntity entity = new DeviceEntity(); // DeviceEntity entity = new DeviceEntity();
entity.setOnlineTime(new Date()); // entity.setOnlineTime(new Date());
entity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); // entity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
DeviceEntity condition = new DeviceEntity(); // DeviceEntity condition = new DeviceEntity();
condition.setId(deviceEntity.getId()); // condition.setId(deviceEntity.getId());
int update = deviceService.getDeviceDao().update(entity, condition); //int update = deviceService.getDeviceDao().update(entity, condition);
// deviceService.update(deviceEntity); // deviceService.update(deviceEntity);
deviceService.putCache(deviceEntity.getId().toString(),deviceEntity); //deviceService.putCache(deviceEntity.getId().toString(),deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.get(deviceEntity.getProductId()); ProductEntity productEntity = productService.get(deviceEntity.getProductId());
......
...@@ -249,7 +249,7 @@ public class DeviceApiController { ...@@ -249,7 +249,7 @@ public class DeviceApiController {
entity.setId(deviceEntity.getId()); entity.setId(deviceEntity.getId());
entity.setOnlineTime(new Date()); entity.setOnlineTime(new Date());
entity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); entity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
Long lpush = cacheService.lpush(RedisKey.KEY_DEVICE_ONLINE_QUEUE, entity); cacheService.lpush(RedisKey.KEY_DEVICE_ONLINE_QUEUE, entity);
//deviceService.getDeviceDao().update(deviceEntity); //deviceService.getDeviceDao().update(deviceEntity);
rsp.setData(deviceResp); rsp.setData(deviceResp);
...@@ -441,7 +441,8 @@ public class DeviceApiController { ...@@ -441,7 +441,8 @@ public class DeviceApiController {
updateDevice(req, platformEntity, productEntity, deviceEntity); updateDevice(req, platformEntity, productEntity, deviceEntity);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity, null); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity, null);
} }
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.未激活.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.未激活.getValue()) {
...@@ -927,7 +928,9 @@ public class DeviceApiController { ...@@ -927,7 +928,9 @@ public class DeviceApiController {
deviceEntity.setLeadingOfficial(req.getLeadingOfficial()); deviceEntity.setLeadingOfficial(req.getLeadingOfficial());
deviceEntity.setDeviceInFloor(req.getDeviceInFloor()); deviceEntity.setDeviceInFloor(req.getDeviceInFloor());
deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding()); deviceEntity.setDeviceInBuilding(req.getDeviceInBuilding());
deviceService.update(deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
...@@ -951,7 +954,8 @@ public class DeviceApiController { ...@@ -951,7 +954,8 @@ public class DeviceApiController {
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
...@@ -965,7 +969,8 @@ public class DeviceApiController { ...@@ -965,7 +969,8 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
private void deviceStop(DeviceReq req) throws AppException { private void deviceStop(DeviceReq req) throws AppException {
...@@ -977,7 +982,8 @@ public class DeviceApiController { ...@@ -977,7 +982,8 @@ public class DeviceApiController {
deviceEntity.setSwitchSend(false); deviceEntity.setSwitchSend(false);
deviceEntity.setUpdateTime(new Date()); deviceEntity.setUpdateTime(new Date());
deviceEntity.setUpdateUserId(1L); deviceEntity.setUpdateUserId(1L);
deviceService.update(deviceEntity); cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
//deviceService.update(deviceEntity);
} }
private DeviceEntity checkDeviceExist(DeviceReq req) { private DeviceEntity checkDeviceExist(DeviceReq req) {
......
...@@ -20,6 +20,12 @@ public class RedisKey { ...@@ -20,6 +20,12 @@ public class RedisKey {
*/ */
public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue"; public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue";
/**
* 设备更新队列
*/
public static final String KEY_DEVICE_UPDATE_QUEUE = "device:update:queue";
/** /**
* 设备通知外部更新队列 * 设备通知外部更新队列
*/ */
......
...@@ -10,6 +10,8 @@ import com.mortals.xhx.module.device.model.DeviceEntity; ...@@ -10,6 +10,8 @@ import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService; import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.service.PlatformService; import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.service.ProductService; import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.thread.DeviceUpdateComsumerThread;
import com.mortals.xhx.thread.SendThirdPartyThread;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -40,11 +42,19 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic ...@@ -40,11 +42,19 @@ public class DeviceComsumersRegisterService implements IApplicationStartedServic
private SendTaskThreadPool sendTaskThreadPool; private SendTaskThreadPool sendTaskThreadPool;
@Autowired @Autowired
private DirectDynamicListener directDynamicListener; private DirectDynamicListener directDynamicListener;
@Autowired
private DeviceUpdateComsumerThread deviceUpdateComsumerThread;
@Autowired
private SendThirdPartyThread sendThirdPartyThread;
@Override @Override
public void start() { public void start() {
log.info("初始化发送线程数量"); log.info("初始化发送线程数量");
sendTaskThreadPool.init(20); sendTaskThreadPool.init(20);
log.info("设备更新消费线程启动");
deviceUpdateComsumerThread.start();
log.info("设备更新消费线程启动");
sendThirdPartyThread.start();
log.info("服务端消息队列初始化服务开始.."); log.info("服务端消息队列初始化服务开始..");
deviceService.find(new DeviceEntity()) deviceService.find(new DeviceEntity())
......
...@@ -20,7 +20,6 @@ import com.mortals.xhx.queue.TbQueueMsg; ...@@ -20,7 +20,6 @@ import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders; import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.Date; import java.util.Date;
...@@ -33,7 +32,7 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE; ...@@ -33,7 +32,7 @@ import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
* @author: zxfei * @author: zxfei
* @date: 2023/11/8 23:45 * @date: 2023/11/8 23:45
*/ */
@Component //@Component
@Slf4j @Slf4j
public class DeviceDownMsgService implements IApplicationStartedService { public class DeviceDownMsgService implements IApplicationStartedService {
......
...@@ -173,7 +173,9 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -173,7 +173,9 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) { if (deviceEntity.getDeviceStatus() == DeviceStatusEnum.离线.getValue()) {
deviceEntity.setOnlineTime(new Date()); deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue()); deviceEntity.setDeviceStatus(DeviceStatusEnum.在线.getValue());
deviceService.update(deviceEntity);
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
// deviceService.update(deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString());
......
...@@ -11,14 +11,13 @@ import com.mortals.xhx.common.pdu.DeviceReq; ...@@ -11,14 +11,13 @@ import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL; import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL; import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE; import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
@Component //@Component
@Slf4j @Slf4j
public class DeviceSendThirdPartyService implements IApplicationStartedService { public class DeviceSendThirdPartyService implements IApplicationStartedService {
......
...@@ -30,7 +30,7 @@ import java.util.List; ...@@ -30,7 +30,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 设备状态修正 * 设备状态修正 todo 修改更新设备信息在单一线程中
*/ */
@Slf4j @Slf4j
@Service("DeviceStatTask") @Service("DeviceStatTask")
...@@ -51,7 +51,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -51,7 +51,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
log.info("设备状态修正任务"); log.info("设备状态修正任务");
List<DeviceEntity> waitDeviceInfos = new ArrayList<>(); List<DeviceEntity> waitDeviceInfos = new ArrayList<>();
while (true) { while (true) {
DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_ONLINE_QUEUE, DeviceEntity.class); DeviceEntity deviceEntity = cacheService.blpop(RedisKey.KEY_DEVICE_ONLINE_QUEUE,10, DeviceEntity.class);
if (ObjectUtils.isEmpty(deviceEntity)) { if (ObjectUtils.isEmpty(deviceEntity)) {
log.info("deviceEntity:{}",deviceEntity==null); log.info("deviceEntity:{}",deviceEntity==null);
break; break;
...@@ -64,10 +64,12 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -64,10 +64,12 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
log.error("线程休眠异常!",e); log.error("线程休眠异常!",e);
} }
} }
log.info("waitDeviceInfos size:{}", waitDeviceInfos.size()); log.info("waitDeviceInfos size:{}", waitDeviceInfos.size());
if (!ObjectUtils.isEmpty(waitDeviceInfos)) { if (!ObjectUtils.isEmpty(waitDeviceInfos)) {
deviceService.update(waitDeviceInfos); for (DeviceEntity waitDeviceInfo : waitDeviceInfos) {
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,waitDeviceInfo);
}
// deviceService.update(waitDeviceInfos);
} }
//获取所有在线设备 //获取所有在线设备
List<DeviceEntity> deviceList = deviceService.find(new DeviceQuery().deviceStatus(DeviceStatusEnum.在线.getValue())); List<DeviceEntity> deviceList = deviceService.find(new DeviceQuery().deviceStatus(DeviceStatusEnum.在线.getValue()));
...@@ -85,8 +87,11 @@ public class DeviceStatTaskImpl implements ITaskExcuteService { ...@@ -85,8 +87,11 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
if (!ObjectUtils.isEmpty(deviceOfflineList)) { if (!ObjectUtils.isEmpty(deviceOfflineList)) {
log.info("修正设备数量:size:{}", deviceOfflineList.size()); log.info("修正设备数量:size:{}", deviceOfflineList.size());
deviceService.update(deviceOfflineList);
// deviceService.update(deviceOfflineList);
deviceOfflineList.forEach(deviceEntity -> { deviceOfflineList.forEach(deviceEntity -> {
cacheService.lpush(RedisKey.KEY_DEVICE_UPDATE_QUEUE,deviceEntity);
PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString()); PlatformEntity platformEntity = platformService.getCache(deviceEntity.getPlatformId().toString());
ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString()); ProductEntity productEntity = productService.getCache(deviceEntity.getProductId().toString());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) { if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
......
...@@ -25,9 +25,8 @@ public class DeviceAlarmInfoEntity extends DeviceAlarmInfoVo { ...@@ -25,9 +25,8 @@ public class DeviceAlarmInfoEntity extends DeviceAlarmInfoVo {
*/ */
private Long alarmDevice; private Long alarmDevice;
/** /**
* 告警类型,(0.离线) * 告警类型(0.离线)
*/ */
@Excel(name = "告警类型,", readConverterExp = "告警类型,(0.离线)")
private Integer alarmType; private Integer alarmType;
/** /**
* 告警级别(0.危险,1.次要,2.一般) * 告警级别(0.危险,1.次要,2.一般)
......
package com.mortals.xhx.thread;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.base.system.message.impl.MessageProducer;
import com.mortals.xhx.busiz.req.DeviceMsgReq;
import com.mortals.xhx.common.code.LogTypeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_DOWN_MSG_QUEUE;
/**
* 发送第三方服务线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 3)
public class DeviceDownMsgThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceLogService deviceLogService;
@Autowired
private MessageProducer messageProducer;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() {
log.info("update device thread process");
DeviceMsgReq deviceMsgReq = cacheService.blpop(KEY_DEVICE_DOWN_MSG_QUEUE, 10, DeviceMsgReq.class);
if (!ObjectUtils.isEmpty(deviceMsgReq)) {
log.info("下发消息:{}", deviceMsgReq.getDeviceCode());
//根据设备编码查询设备
DeviceEntity deviceEntity = null;
try {
deviceEntity = deviceService.getExtCache(deviceMsgReq.getDeviceCode());
} catch (Exception e) {
log.error("redis 获取设备异常!", e);
}
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, deviceMsgReq.getMessageType());
header.put(MessageHeader.DEVICECODE, deviceMsgReq.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, deviceMsgReq.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), deviceMsgReq.getData(), header);
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setTraceID(IdUtil.fastSimpleUUID());
deviceLogEntity.setSiteId(deviceEntity.getSiteId());
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceCode(deviceEntity.getDeviceCode());
deviceLogEntity.setMessageHead(deviceMsgReq.getMessageType());
deviceLogEntity.setContent(deviceMsgReq.getData());
deviceLogEntity.setLogType(LogTypeEnum.下发服务.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity, null);
} else {
log.info("未找到设备,deviceCode:{}", deviceMsgReq.getDeviceCode());
}
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
package com.mortals.xhx.thread;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 设备消息更新线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 1)
public class DeviceUpdateComsumerThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private DeviceService deviceService;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() {
log.info("update device thread process");
List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>();
while (true) {
DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_UPDATE_QUEUE, DeviceEntity.class);
if (ObjectUtils.isEmpty(deviceEntity)) {
log.info("deviceEntity:{}", deviceEntity == null);
break;
} else {
waitUpdateDeviceList.add(deviceEntity);
}
}
if (!ObjectUtils.isEmpty(waitUpdateDeviceList)) {
log.info("updateDeviceList size:{}", waitUpdateDeviceList.size());
deviceService.update(waitUpdateDeviceList);
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
package com.mortals.xhx.thread;
import cn.hutool.core.net.url.UrlBuilder;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
/**
* 发送第三方服务线程
*
* @author zxfei
*/
@Slf4j
@Service
@Order(value = 2)
public class SendThirdPartyThread extends AbstractThread {
@Autowired
private ICacheService cacheService;
@Autowired
private MessageService messageService;
@Value("${thirdPartyPath:/inter/device/deviceIn}")
public String thirdPartyPath;
@Override
protected int getSleepTime() {
return 1000;
}
@Override
protected void process() {
log.info("SendThirdPartyThread process");
DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE, 10, DeviceReq.class);
if (!ObjectUtils.isEmpty(deviceReq)) {
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://127.0.0.1:11078/zwfw_api");
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
}
}
@Override
protected void threadClosed() {
// TODO Auto-generated method stub
super.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment