Commit 20c7e30c authored by 赵啸非's avatar 赵啸非

添加设备消息接收下发功能

parent a11e2cba
......@@ -79,14 +79,13 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(rabbitMqSettings.getMessageTtl()).build();
//properties.builder().expiration(rabbitMqSettings.getMessageTtl());
try {
if (!topicIfNotExist) {
//topic不存在创建通道队列
Map<String, Object> args = new HashMap<>();
log.info("x-message-ttl:{}",rabbitMqSettings.getMessageTtl());
args.put("x-message-ttl", Integer.parseInt(rabbitMqSettings.getMessageTtl()));
channel.queueDeclare(tpi.getTopic(), true, false, false, null);
channel.queueDeclare(tpi.getTopic(), true, false, false, args);
}
if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
......@@ -94,7 +93,6 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT, true, false, null);
}
channel.queueBind(tpi.getTopic(), tpi.getExchangeName(), tpi.getTopic());
//channel.queueBind()
channel.basicPublish(tpi.getExchangeName(), tpi.getTopic(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(null);
......@@ -165,7 +163,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
private Boolean createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
log.info("contains topc:",tpi.getTopic());
log.info("contains topc:{}",tpi.getTopic());
return true;
}
topics.add(tpi);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment