Commit 28c5d1e4 authored by 赵啸非's avatar 赵啸非

修改消息组件

parent 306ae02c
...@@ -12,6 +12,8 @@ import com.rabbitmq.client.Connection; ...@@ -12,6 +12,8 @@ import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -76,10 +78,13 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue ...@@ -76,10 +78,13 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
@Override @Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi); Boolean topicIfNotExist = createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(rabbitMqSettings.getMessageTtl()).build();
//properties.builder().expiration(rabbitMqSettings.getMessageTtl());
try { try {
if (!topicIfNotExist) { if (!topicIfNotExist) {
//topic不存在创建通道队列 //topic不存在创建通道队列
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", rabbitMqSettings.getMessageTtl());
channel.queueDeclare(tpi.getTopic(), true, false, false, null); channel.queueDeclare(tpi.getTopic(), true, false, false, null);
} }
if (!innerExists(tpi.getExchangeName(), channel)) { if (!innerExists(tpi.getExchangeName(), channel)) {
...@@ -129,7 +134,9 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue ...@@ -129,7 +134,9 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
try { try {
if (!topicIfNotExist) { if (!topicIfNotExist) {
//topic不存在创建通道队列 //topic不存在创建通道队列
channel.queueDeclare(tpi.getTopic(), true, false, false, null); Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", rabbitMqSettings.getMessageTtl());
channel.queueDeclare(tpi.getTopic(), true, false, false, args);
} }
if (!innerExists(tpi.getExchangeName(), channel)) { if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定 //判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
......
...@@ -3,6 +3,7 @@ package com.mortals.xhx.queue.rabbitmq; ...@@ -3,6 +3,7 @@ package com.mortals.xhx.queue.rabbitmq;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
...@@ -34,6 +35,10 @@ public class TbRabbitMqSettings { ...@@ -34,6 +35,10 @@ public class TbRabbitMqSettings {
@Value("${queue.rabbitmq.handshake_timeout:}") @Value("${queue.rabbitmq.handshake_timeout:}")
private int handshakeTimeout; private int handshakeTimeout;
@Value("${queue.rabbitmq.queue-properties.x-message-ttl:86400000}")
@Getter
private String messageTtl;
private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactory;
public void setVHost(String virtualHost) { public void setVHost(String virtualHost) {
......
...@@ -98,7 +98,8 @@ queue: ...@@ -98,7 +98,8 @@ queue:
connection_timeout: 60000 connection_timeout: 60000
handshake_timeout: 10000 handshake_timeout: 10000
queue-properties: queue-properties:
core: x-max-length-bytes:1048576000;x-message-ttl:604800000 x-message-ttl: 86400000
x-max-length-bytes: 1048576000
# token配置 # token配置
token: token:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment