Commit 2d8d29b5 authored by 赵啸非's avatar 赵啸非

添加消息发送系统

parent fef285dd
...@@ -104,6 +104,11 @@ ...@@ -104,6 +104,11 @@
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-openfeign-core</artifactId> <artifactId>spring-cloud-openfeign-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
......
package com.mortals.xhx.queue;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 默认消息
*
* @author: zxfei
* @date: 2021/11/22 10:59
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DefaultTbQueueMsg implements TbQueueMsg {
/**
* key 唯一标识
*/
private String key;
/**
* 数据载体
*/
private String data;
/**
* 消息头信息
*/
private TbQueueMsgHeaders headers;
public DefaultTbQueueMsg(TbQueueMsg msg) {
this.key = msg.getKey();
this.data = msg.getData();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
msg.getHeaders().getData().entrySet().stream().forEach(item->
headers.put(item.getKey(),item.getValue()));
this.headers = headers;
}
public static void main(String[] args) {
}
}
package com.mortals.xhx.queue;
import lombok.Setter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 默认消息头
*
* @author: zxfei
* @date: 2021/11/22 11:14
*/
public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
@Setter
protected Map<String, String> data = new HashMap<>();
public DefaultTbQueueMsgHeaders() {
data.put(MessageHeader.TIMESTAMP, String.valueOf(new Date().getTime()));
// data.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.SHA256withRSA).sign(data.get(MessageHeader.TIMESTAMP).getBytes())));
// TODO: 2022/4/15
data.put(MessageHeader.MESSAGESIGN, "abcd1234");
data.put(MessageHeader.MESSAGEPROTOCOL, "json");
// data.put(MessageHeader.TOPIC, "");
// data.put(MessageHeader.QOS, "0");
}
@Override
public void put(String key, String value) {
data.put(key, value);
}
@Override
public String get(String key) {
return data.get(key);
}
@Override
public Map<String, String> getData() {
return data;
}
}
package com.mortals.xhx.queue;
/**
* 消息头
*
* @author: zxfei
* @date: 2021/11/22 11:15
*/
public class MessageHeader {
/**
* 客户id
*/
public static final String CLIENTID = "clientId";
/**
* 协议
*/
public static final String MESSAGEPROTOCOL = "protocol";
/**
* 时间戳
*/
public static final String TIMESTAMP = "timestamp";
/**
* 消息签名
*/
public static final String MESSAGESIGN = "sign";
/**
* 消息类型
*/
public static final String MESSAGETYPE = "messageType";
public static final String DEVICECODE = "deviceCode";
/**
* topic
*/
public static final String TOPIC = "topic";
/**
* 消息等级
*/
public static final String QOS = "qos";
public static final String RETAIN = "retain";
public static final String WILL = "will";
public static final String DUP = "dup";
}
package com.mortals.xhx.queue;
/**
* 队列消息体
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsg {
String getKey();
TbQueueMsgHeaders getHeaders();
String getData();
}
package com.mortals.xhx.queue;
import java.util.Map;
/**
* 消息头信息
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsgHeaders {
void put(String key, String value);
String get(String key);
Map<String, String> getData();
void setData(Map<String, String> data);
}
package com.mortals.xhx.base.framework.config;
import com.mortals.xhx.common.key.QueueKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class RabbitConfig {
public Integer messageTtl = 86400000;
public Map<String, Object> args = new HashMap<>();
// 创建 Queue
@Bean
public Queue alarmMsgQueue() {
args.put("x-message-ttl", messageTtl);
return new Queue(QueueKey.ALARM_MSG_QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false,
args); // autoDelete: 是否自动删除
}
// 创建 Direct Exchange
@Bean
public DirectExchange exchange() {
return new DirectExchange(QueueKey.DEFAULT_EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}
// 创建 Binding
@Bean
public Binding accessBinding() {
log.info("添加队列ALARM_MSG_QUEUE");
Binding binding = BindingBuilder.bind(alarmMsgQueue()).to(exchange()).with(QueueKey.ALARM_MSG_QUEUE);
return binding;
}
}
package com.mortals.xhx.common.key;
/**
* rabbit 队列key定义
*/
public class QueueKey {
public static final String ALARM_MSG_QUEUE = "ALARM_MSG_QUEUE";
public static final String DEFAULT_EXCHANGE = "amq.direct";
}
package com.mortals.xhx.thread; package com.mortals.xhx.thread;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.util.AbstractThread; import com.mortals.framework.util.AbstractThread;
import com.mortals.framework.util.ThreadPool; import com.mortals.framework.util.ThreadPool;
import com.mortals.xhx.busiz.req.SmsThirdPartyReq; import com.mortals.xhx.busiz.req.SmsThirdPartyReq;
import com.mortals.xhx.common.code.SendStatusEnum; import com.mortals.xhx.common.code.SendStatusEnum;
import com.mortals.xhx.common.code.YesNoEnum; import com.mortals.xhx.common.code.YesNoEnum;
import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.common.utils.SmsQueueManager; import com.mortals.xhx.common.utils.SmsQueueManager;
import com.mortals.xhx.module.message.model.MessageTaskEntity; import com.mortals.xhx.module.message.model.MessageTaskEntity;
import com.mortals.xhx.module.message.model.MessageTaskQuery; import com.mortals.xhx.module.message.model.MessageTaskQuery;
import com.mortals.xhx.module.message.service.MessageTaskService; import com.mortals.xhx.module.message.service.MessageTaskService;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.utils.SpringUtils; import com.mortals.xhx.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -75,13 +80,24 @@ public class AlarmSendMsgThread extends AbstractThread { ...@@ -75,13 +80,24 @@ public class AlarmSendMsgThread extends AbstractThread {
messageTask.setSendStatus(SendStatusEnum.发送中.getValue()); messageTask.setSendStatus(SendStatusEnum.发送中.getValue());
messageTask.setSendTime(new Date()); messageTask.setSendTime(new Date());
SmsQueueManager.offerRespQueue(messageTask); SmsQueueManager.offerRespQueue(messageTask);
}
//默认发送rabbitmq
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, "alarm");
header.put(MessageHeader.TIMESTAMP, new Date().getTime() + "");
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(),messageTask.getContent(), header);
CorrelationData correlationData = new CorrelationData(IdUtil.fastSimpleUUID());
rabbitTemplate.convertAndSend(QueueKey.DEFAULT_EXCHANGE, QueueKey.ALARM_MSG_QUEUE, JSON.toJSONString(queueMsg), correlationData);
}
/*
/* List<AlarmRecordsEntity> alarmRecordsEntities = messageTaskService.find(new AlarmRecordsQuery().push(YesNoEnum.NO.getValue())); List<AlarmRecordsEntity> alarmRecordsEntities = messageTaskService.find(new AlarmRecordsQuery().push(YesNoEnum.NO.getValue()));
for (AlarmRecordsEntity alarmRecordsEntity : alarmRecordsEntities) { for (AlarmRecordsEntity alarmRecordsEntity : alarmRecordsEntities) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders(); TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment