Commit a94e573c authored by 赵啸非's avatar 赵啸非

修改redis 过期事件通知

parent 26ab045c
...@@ -114,6 +114,8 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -114,6 +114,8 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId()); PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
ProductEntity productEntity = productService.get(deviceEntity.getProductId()); ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) { if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
//注册rabbmit相关队列与绑定
registerRabbitQueue(deviceEntity, platformEntity, productEntity);
//新增设备通知第三方平台 //新增设备通知第三方平台
sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceStatusEnum.ACTIVE); sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceStatusEnum.ACTIVE);
} }
...@@ -137,29 +139,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -137,29 +139,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
ProductEntity productEntity = productService.get(entity.getProductId()); ProductEntity productEntity = productService.get(entity.getProductId());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) { if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
//注册rabbmit相关队列与绑定 //注册rabbmit相关队列与绑定
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode(); registerRabbitQueue(entity, platformEntity, productEntity);
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + entity.getDeviceCode()).build();
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("队列创建成功,设备编码:{}", entity.getDeviceCode());
}
@Override
public void onFailure(Throwable t) {
log.error("队列创建失败,设备通道编码:{},{}", entity.getDeviceCode(), t);
}
};
messageService.queueDeclare(info, callback);
//消费监听线程添加当前声明队列
Set<TopicPartitionInfo> partitions = new HashSet<>();
partitions.add(info);
consumerService.getMainConsumer().subscribe(partitions);
//创建下行队列
info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.DOWN_TOPIC + entity.getDeviceCode()).build();
messageService.queueDeclare(info, callback);
//新增设备通知第三方平台 //新增设备通知第三方平台
sendThirdParty(entity, productEntity, platformEntity, DeviceStatusEnum.ADD); sendThirdParty(entity, productEntity, platformEntity, DeviceStatusEnum.ADD);
...@@ -169,12 +149,40 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -169,12 +149,40 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
super.saveAfter(entity, context); super.saveAfter(entity, context);
} }
private void registerRabbitQueue(DeviceEntity entity, PlatformEntity platformEntity, ProductEntity productEntity) {
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode();
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + entity.getDeviceCode()).build();
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("队列创建成功,设备编码:{}", entity.getDeviceCode());
}
@Override
public void onFailure(Throwable t) {
log.error("队列创建失败,设备通道编码:{},{}", entity.getDeviceCode(), t);
}
};
messageService.queueDeclare(info, callback);
//消费监听线程添加当前声明队列
Set<TopicPartitionInfo> partitions = new HashSet<>();
partitions.add(info);
consumerService.getMainConsumer().subscribe(partitions);
//创建下行队列
info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.DOWN_TOPIC + entity.getDeviceCode()).build();
messageService.queueDeclare(info, callback);
}
@Override @Override
protected void updateAfter(DeviceEntity entity, Context context) throws AppException { protected void updateAfter(DeviceEntity entity, Context context) throws AppException {
PlatformEntity platformEntity = platformService.get(entity.getPlatformId()); PlatformEntity platformEntity = platformService.get(entity.getPlatformId());
ProductEntity productEntity = productService.get(entity.getProductId()); ProductEntity productEntity = productService.get(entity.getProductId());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) { if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
//注册rabbmit相关队列与绑定
registerRabbitQueue(entity, platformEntity, productEntity);
//新增设备通知第三方平台 //新增设备通知第三方平台
sendThirdParty(entity, productEntity, platformEntity, DeviceStatusEnum.UPDATE); sendThirdParty(entity, productEntity, platformEntity, DeviceStatusEnum.UPDATE);
} }
......
...@@ -102,25 +102,13 @@ Content-Type: application/json ...@@ -102,25 +102,13 @@ Content-Type: application/json
} }
###设备上报
POST {{baseUrl}}/api/upload
Content-Type: application/json
Authorization: Bearer {{authToken}}
{
"deviceNum": "AB:DD:DF:FD:AD:FA:DA:SS",
"action": "upload"
}
###设备上报 ###设备上报
POST {{baseUrl}}/api/upload POST {{baseUrl}}/api/upload
Content-Type: application/json Content-Type: application/json
Authorization: {{authToken}} Authorization: {{authToken}}
{ {
"deviceNum": "AB:DD:DF:FD:AD:FA:DA:bb", "deviceCode": "b12345678",
"action": "upload" "action": "upload"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment