Commit 0c9231d8 authored by 赵啸非's avatar 赵啸非

更新下发消息为线程池发送

parent ef269538
...@@ -5,11 +5,13 @@ import com.mortals.xhx.base.framework.listener.RabbitLoggingErrorHandler; ...@@ -5,11 +5,13 @@ import com.mortals.xhx.base.framework.listener.RabbitLoggingErrorHandler;
import com.mortals.xhx.base.framework.listener.SimpleDynamicListener; import com.mortals.xhx.base.framework.listener.SimpleDynamicListener;
import com.mortals.xhx.base.system.message.MessageCallbackService; import com.mortals.xhx.base.system.message.MessageCallbackService;
import com.mortals.xhx.base.system.message.impl.MessageProducer; import com.mortals.xhx.base.system.message.impl.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate; import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
...@@ -24,6 +26,7 @@ import org.springframework.core.annotation.Order; ...@@ -24,6 +26,7 @@ import org.springframework.core.annotation.Order;
@Configuration @Configuration
@Order(1) @Order(1)
@Slf4j
public class RabbitConfig { public class RabbitConfig {
@Autowired @Autowired
private SimpleDynamicListener simpleDynamicListener; private SimpleDynamicListener simpleDynamicListener;
...@@ -70,8 +73,29 @@ public class RabbitConfig { ...@@ -70,8 +73,29 @@ public class RabbitConfig {
@Bean @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReturnCallback(messageCallbackService);
rabbitTemplate.setConfirmCallback(messageCallbackService); /* rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 发送消息的时候发送的业务id
log.info("发送消息id:{},ack:{},cause:{}",correlationData.getId(),ack,cause);
// 时候发送成功,即ACK
//log.info(ack);
// 发送失败的原因
// System.out.println(cause);
}
});
/* rabbitTemplate.setReturnCallback(messageCallbackService);
rabbitTemplate.setConfirmCallback(messageCallbackService);*/
return rabbitTemplate; return rabbitTemplate;
} }
......
package com.mortals.xhx.base.system.message; package com.mortals.xhx.base.system.message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
public interface RabbitMessageService { public interface RabbitMessageService {
...@@ -11,6 +13,14 @@ public interface RabbitMessageService { ...@@ -11,6 +13,14 @@ public interface RabbitMessageService {
*/ */
void sendMsg(String exchange, String routingKey, String message); void sendMsg(String exchange, String routingKey, String message);
/**
* 发送消息
*
* @author: zxfei
* @date: 2022/9/2 11:31
*/
void sendMsg(String exchange, String routingKey, String message, CorrelationData correlationData);
/** /**
* 新增队列并绑定交换机与路由 * 新增队列并绑定交换机与路由
* *
......
...@@ -59,6 +59,11 @@ public class MessageProducer implements IMessageProduceService, RabbitMessageSer ...@@ -59,6 +59,11 @@ public class MessageProducer implements IMessageProduceService, RabbitMessageSer
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} }
@Override
public void sendMsg(String exchange, String routingKey, String message, CorrelationData correlationData) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
@Override @Override
public void queueAddAndBinds(String exchange, String routingKey, String queue) { public void queueAddAndBinds(String exchange, String routingKey, String queue) {
Map<String, Object> args = new HashMap<>(); Map<String, Object> args = new HashMap<>();
......
...@@ -21,6 +21,7 @@ import com.mortals.xhx.queue.TbQueueMsg; ...@@ -21,6 +21,7 @@ import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders; import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.Comparator; import java.util.Comparator;
...@@ -69,6 +70,10 @@ public class DownMsgTask implements Runnable { ...@@ -69,6 +70,10 @@ public class DownMsgTask implements Runnable {
header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString()); header.put(MessageHeader.TIMESTAMP, item.getTimestamp().toString());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header); TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), item.getData(), header);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(deviceEntity.getDeviceCode());
messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg)); messageProducer.sendMsg(QueueKey.DEFAULT_EXCHANGE, Constant.DOWN_TOPIC + deviceEntity.getDeviceCode(), JSON.toJSONString(queueMsg));
DeviceLogEntity deviceLogEntity = new DeviceLogEntity(); DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment