Commit 5c6dd7d6 authored by 赵啸非's avatar 赵啸非

修改消息组件

parent a658a419
...@@ -56,30 +56,6 @@ public class ConsumerService { ...@@ -56,30 +56,6 @@ public class ConsumerService {
for (TbQueueMsg item : msgs) { for (TbQueueMsg item : msgs) {
//todo //todo
// Message innerMsg = new Message();
// TbQueueMsgHeaders headMap = item.getHeaders();
// int messageProtocl = headMap.get(MessageHeader.MESSAGEPROTOCOL)[0];
// Map<String, Object> headers = new HashMap<>();
// innerMsg.setPayload(item.getData());
// headers.put(MessageHeader.MESSAGEPROTOCOL, messageProtocl);
// innerMsg.setStoreTime(DateUtils.getCurrDate().getTime());
// if(messageProtocl== MsgTypeEnum.MSG_SYSTEM.getValue()){
// innerMsg.setHeaders(headers);
// sendSysMessage2Cluster(innerMsg);
// }else{
// String clientId = MixAll.convertByteS2Str(headMap.get(MessageHeader.CLIENTID));
// int messageType = headMap.get(MessageHeader.MESSAGETYPE)[0];
// String topic = MixAll.convertByteS2Str(headMap.get(MessageHeader.TOPIC));
// int qos = headMap.get(MessageHeader.QOS)[0];
//
// innerMsg.setClientId(clientId);
// innerMsg.setType(Message.Type.valueOf(messageType));
// headers.put(MessageHeader.TOPIC, topic);
// headers.put(MessageHeader.QOS, qos);
// headers.put(MessageHeader.RETAIN, false);
// headers.put(MessageHeader.DUP, false);
// innerMsg.setHeaders(headers);
// sendContrlMessage2Cluster(innerMsg);
// } // }
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -8,16 +8,29 @@ import java.util.Optional; ...@@ -8,16 +8,29 @@ import java.util.Optional;
@Data @Data
public class TopicPartitionInfo { public class TopicPartitionInfo {
private String topic; /**
private Integer partition; * topic名称
*/
private String fullTopicName; private String topic;
/**
* 分区,kafka存在
*/
private Integer partition;
/**
* 交换机名称,rabbmitmq存在
*/
private String exchangeName;
/**
* 带分区的topic
*/
private String fullTopicName;
@Builder @Builder
public TopicPartitionInfo(String topic, Integer partition) { public TopicPartitionInfo(String topic, Integer partition, String exchangeName) {
this.topic = topic; this.topic = topic;
this.partition = partition; this.partition = partition;
this.exchangeName = exchangeName;
String tmp = topic; String tmp = topic;
if (partition != null) { if (partition != null) {
tmp += "." + partition; tmp += "." + partition;
...@@ -26,7 +39,7 @@ public class TopicPartitionInfo { ...@@ -26,7 +39,7 @@ public class TopicPartitionInfo {
} }
public TopicPartitionInfo newByTopic(String topic) { public TopicPartitionInfo newByTopic(String topic) {
return new TopicPartitionInfo(topic, this.partition); return new TopicPartitionInfo(topic, this.partition, "");
} }
public String getTopic() { public String getTopic() {
......
package com.mortals.xhx.queue.kafka; package com.mortals.xhx.queue.kafka;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.queue.TbQueueConsumer; import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg; import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo; import com.mortals.xhx.queue.TopicPartitionInfo;
...@@ -40,19 +41,19 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -40,19 +41,19 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override @Override
public void subscribe() { public void subscribe() {
log.info("enqueue topic subscribe {} ", topic);
if (stopped) { if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic); log.error("consumer 线程已停止 topic {}", topic);
return; return;
} }
subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null))); subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null,"")));
} }
@Override @Override
public void subscribe(Set<TopicPartitionInfo> partitions) { public void subscribe(Set<TopicPartitionInfo> partitions) {
log.info("enqueue topics subscribe {} ", partitions); log.info("订阅的topics {} ", JSON.toJSONString(partitions));
if (stopped) { if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic); log.error("订阅服务已停止,topic {}", topic);
return; return;
} }
subscribeQueue.add(partitions); subscribeQueue.add(partitions);
...@@ -60,7 +61,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -60,7 +61,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override @Override
public List<T> poll(long durationInMillis) { public List<T> poll(long durationInMillis) {
List<R> records; List<R> records;
long startNanos = System.nanoTime(); long startNanos = System.nanoTime();
if (stopped) { if (stopped) {
...@@ -73,7 +73,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -73,7 +73,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
if (consumerLock.isLocked()) { if (consumerLock.isLocked()) {
log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace")); log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace"));
} }
consumerLock.lock(); consumerLock.lock();
try { try {
//更新订阅的主题 //更新订阅的主题
...@@ -169,5 +168,4 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -169,5 +168,4 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected void doUnsubscribe(); abstract protected void doUnsubscribe();
} }
...@@ -56,9 +56,8 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -56,9 +56,8 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
GetResponse getResponse = channel.basicGet(queue, true); GetResponse getResponse = channel.basicGet(queue, true);
return getResponse; return getResponse;
} catch (IOException e) { } catch (IOException e) {
// log.error("Failed to get messages from queue: [{}]" , queue); log.error("Failed to get messages from queue: [{}]" , queue,e);
return null; return null;
// throw new RuntimeException("Failed to get messages from queue." , e);
} }
}).filter(Objects::nonNull).collect(Collectors.toList()); }).filter(Objects::nonNull).collect(Collectors.toList());
if (result.size() > 0) { if (result.size() > 0) {
...@@ -73,7 +72,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -73,7 +72,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
queues = partitions.stream() queues = partitions.stream()
.map(TopicPartitionInfo::getFullTopicName) .map(TopicPartitionInfo::getFullTopicName)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
//queues.forEach(admin::createTopicIfNotExists);
} }
@Override @Override
......
...@@ -6,6 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors; ...@@ -6,6 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.mortals.xhx.queue.*; import com.mortals.xhx.queue.*;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -16,15 +17,37 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -16,15 +17,37 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/**
* rabbmit 消息生产模板
*
* @author: zxfei
* @date: 2022/4/25 16:10
*/
@Slf4j @Slf4j
public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
/**
* defaultTopic
*/
private String defaultTopic; private String defaultTopic;
/**
* rabbmit设置
*/
private TbRabbitMqSettings rabbitMqSettings; private TbRabbitMqSettings rabbitMqSettings;
/**
* 线程执行器
*/
private ListeningExecutorService producerExecutor; private ListeningExecutorService producerExecutor;
/**
* 通道
*/
private Channel channel; private Channel channel;
/**
* 连接器
*/
private Connection connection; private Connection connection;
private final Gson gson = new Gson(); /**
* topic组
*/
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet(); private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
public TbRabbitMqProducerTemplate(TbRabbitMqSettings rabbitMqSettings, String defaultTopic) { public TbRabbitMqProducerTemplate(TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
...@@ -35,8 +58,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue ...@@ -35,8 +58,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
connection = rabbitMqSettings.getConnectionFactory().newConnection(); connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel(); channel = connection.createChannel();
} catch (IOException | TimeoutException e) { } catch (IOException | TimeoutException e) {
log.error("Failed to create connection.", e); log.error("rabbmit创建连接失败!", e);
// throw new RuntimeException("Failed to create connection." , e);
} }
} }
...@@ -55,23 +77,19 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue ...@@ -55,23 +77,19 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi); Boolean topicIfNotExist = createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties(); AMQP.BasicProperties properties = new AMQP.BasicProperties();
// .builder()
// .contentType("application/json")
// .deliveryMode(2) // 消息是否持久化,1未持久,2持久
// .contentEncoding("UTF-8") // 编码方式
// .expiration("100000") // 过期时间单位毫秒
// .build();
try { try {
if (!topicIfNotExist) { if (!topicIfNotExist) {
channel.queueDeclare(tpi.getTopic(), true, false, false, null); channel.queueDeclare(tpi.getTopic(), true, false, false, null);
} }
//channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes()); if (!innerExists(tpi.getExchangeName(), channel)) {
channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes()); channel = connection.createChannel();
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT);
}
channel.basicPublish(tpi.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) { if (callback != null) {
callback.onSuccess(null); callback.onSuccess(null);
} }
} catch ( } catch (IOException e) {
IOException e) {
log.error("Failed publish message: [{}].", msg, e); log.error("Failed publish message: [{}].", msg, e);
if (callback != null) { if (callback != null) {
callback.onFailure(e); callback.onFailure(e);
...@@ -108,4 +126,15 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue ...@@ -108,4 +126,15 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
topics.add(tpi); topics.add(tpi);
return false; return false;
} }
private boolean innerExists(String exchangeName, Channel outerChannel) {
boolean result = true;
try {
outerChannel.exchangeDeclarePassive(exchangeName);
} catch (IOException e) {
result = false;
}
return result;
}
} }
...@@ -98,7 +98,7 @@ export default { ...@@ -98,7 +98,7 @@ export default {
}); });
let _this = this; let _this = this;
const getsocketData = (e) => { const getsocketData = (e) => {i
// 创建接收消息函数 // 创建接收消息函数
const data = e && e.detail.data; const data = e && e.detail.data;
......
...@@ -16,6 +16,7 @@ public class P6spySqlFormatConfig implements MessageFormattingStrategy { ...@@ -16,6 +16,7 @@ public class P6spySqlFormatConfig implements MessageFormattingStrategy {
@Override @Override
public String formatMessage(int connectionId, String now, long elapsed, String category, String prepared, String sql, String url) { public String formatMessage(int connectionId, String now, long elapsed, String category, String prepared, String sql, String url) {
return StringUtils.isNotBlank(sql) ? DateUtils.getCurrStrDateTime() return StringUtils.isNotBlank(sql) ? DateUtils.getCurrStrDateTime()
+ " | 耗时 " + elapsed + " ms | SQL:" + StringUtils.LF + sql.replaceAll("[\\s]+", StringUtils.SPACE) + ";" : ""; + " | 耗时 " + elapsed + " ms | SQL:" + StringUtils.LF + sql.replaceAll("[\\s]+", StringUtils.SPACE) + ";" : "";
} }
......
...@@ -148,7 +148,7 @@ public class DeviceApiController { ...@@ -148,7 +148,7 @@ public class DeviceApiController {
* @return * @return
*/ */
@PostMapping("upload") @PostMapping("upload")
@DeviceAuth //@DeviceAuth
public String upload(@RequestBody UploadDeviceReq req) { public String upload(@RequestBody UploadDeviceReq req) {
log.info("【设备数据上报】【请求体】--> " + JSONObject.toJSONString(req)); log.info("【设备数据上报】【请求体】--> " + JSONObject.toJSONString(req));
ApiResp<String> rsp = new ApiResp<>(); ApiResp<String> rsp = new ApiResp<>();
...@@ -159,7 +159,19 @@ public class DeviceApiController { ...@@ -159,7 +159,19 @@ public class DeviceApiController {
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum()); DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) { if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中 //将上报信息转发到mq中
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
if (ObjectUtils.isEmpty(platformEntity)) {
throw new AppException("当前设备未配置所属系统平台,请在后台配置后再激活!");
}
// authInfo.setHost(platformEntity.getPlatformSn());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (ObjectUtils.isEmpty(productEntity)) {
throw new AppException("当前设备未配置所属产品,请在后台配置后再激活!");
}
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode();
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders(); TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT); header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT);
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null); ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null);
......
...@@ -59,7 +59,7 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi ...@@ -59,7 +59,7 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
//订阅所有已几快活设备 //订阅所有已几快活设备
Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream() Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream()
.map(item -> .map(item ->
new TopicPartitionInfo(Constant.UPLOAD_TOPIC + item.getDeviceMac(), null) new TopicPartitionInfo(Constant.UPLOAD_TOPIC + item.getDeviceMac(), null,null)
).collect(Collectors.toSet()); ).collect(Collectors.toSet());
mainConsumer.subscribe(topicPartitionInfoSet); mainConsumer.subscribe(topicPartitionInfoSet);
log.info("消费线程订阅设备上报消息成功!:" + JSON.toJSONString(topicPartitionInfoSet)); log.info("消费线程订阅设备上报消息成功!:" + JSON.toJSONString(topicPartitionInfoSet));
......
...@@ -105,7 +105,7 @@ token: ...@@ -105,7 +105,7 @@ token:
# 令牌自定义标识 # 令牌自定义标识
header: Authorization header: Authorization
# 令牌密钥 # 令牌密钥
secret: abcd1234 secret: 026db82420614469897fcc2dc1b4ce38
# 令牌有效期(默认60分钟) # 令牌有效期(默认60分钟)
expireTime: 60 expireTime: 60
# 令牌前缀 # 令牌前缀
......
...@@ -16,4 +16,7 @@ driverlist=com.mysql.cj.jdbc.Driver ...@@ -16,4 +16,7 @@ driverlist=com.mysql.cj.jdbc.Driver
# 是否开启慢SQL记录 # 是否开启慢SQL记录
outagedetection=true outagedetection=true
# 慢SQL记录标准 秒 # 慢SQL记录标准 秒
outagedetectioninterval=2 outagedetectioninterval=2
\ No newline at end of file
filter=true
exclude=mortals_xhx_task
\ No newline at end of file
...@@ -54,7 +54,7 @@ POST {{baseUrl}}/api/active ...@@ -54,7 +54,7 @@ POST {{baseUrl}}/api/active
Content-Type: application/json Content-Type: application/json
{ {
"deviceNum":"b12345678", "deviceNum":"AB:DD:DF:FD:AD:FA:DA:SS",
"deviceMac":"AB:DD:DF:FD:AD:FA:DA:SS", "deviceMac":"AB:DD:DF:FD:AD:FA:DA:SS",
"action":"active" "action":"active"
} }
...@@ -71,7 +71,7 @@ Content-Type: application/json ...@@ -71,7 +71,7 @@ Content-Type: application/json
Authorization: Bearer {{authToken}} Authorization: Bearer {{authToken}}
{ {
"deviceNum":"a12345678", "deviceNum":"AB:DD:DF:FD:AD:FA:DA:SS",
"action":"upload" "action":"upload"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment