Commit 24448285 authored by 赵啸非's avatar 赵啸非

添加消息队列创建与删除

parent 886487d5
......@@ -19,4 +19,6 @@ public interface TbQueueProducer<T extends TbQueueMsg> {
void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback);
void queueDel(String queue, TbQueueCallback callback);
}
......@@ -95,5 +95,10 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
// TODO: 2022/4/29 创建kafka队列
}
@Override
public void queueDel(String queue, TbQueueCallback callback) {
// TODO: 2022/5/20 删除队列
}
}
......@@ -154,6 +154,15 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
}
}
@Override
public void queueDel(String queue, TbQueueCallback callback) {
try {
channel.queueDelete(queue);
} catch (IOException e) {
log.error("Failed publish message: {}.", e);
}
}
private Boolean createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
log.info("contains topc:",tpi.getTopic());
......
......@@ -27,6 +27,11 @@ public interface MessageService {
void queueDeclare(TopicPartitionInfo info, TbQueueCallback callback);
/**
* 删除队列
*/
void delQueue(String queue,TbQueueCallback callback);
/**
* 获取鉴权token
*
......
......@@ -75,6 +75,12 @@ public class MessageServiceImpl implements MessageService {
producer.queueDeclare(info, callback);
}
@Override
public void delQueue(String queue , TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
producer.queueDel(queue, callback);
}
@Override
public String getBasePlatformToken() {
String token = cacheService.get(Constant.BASEPLATFORM_AUTHTOKEN);
......
......@@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
......@@ -98,6 +99,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("队列创建成功,设备编码:{}", entity.getDeviceCode());
}
@Override
public void onFailure(Throwable t) {
log.error("队列创建失败,设备通道编码:{},{}", entity.getDeviceCode(), t);
......@@ -113,11 +115,35 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
//创建下行队列
info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.DOWN_TOPIC + entity.getDeviceCode()).build();
messageService.queueDeclare(info, callback);
}else{
} else {
throw new AppException("产品或平台不存在!");
}
super.saveAfter(entity, context);
}
@Override
protected void removeBefore(Long[] ids, Context context) throws AppException {
Arrays.asList(ids).stream().forEach(id -> {
DeviceEntity deviceEntity = this.get(id, context);
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("队列删除成功");
}
@Override
public void onFailure(Throwable t) {
log.error("队列删除失败", t);
}
};
messageService.delQueue(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceCode(),callback);
messageService.delQueue(Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(),callback);
});
super.removeBefore(ids, context);
}
}
\ No newline at end of file
......@@ -111,7 +111,6 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
super.init(request, response, form, model, context);
}
/**
* 下发信息
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment