Commit 322db771 authored by 赵啸非's avatar 赵啸非

添加消息队列

parent b0315ce5
...@@ -50,6 +50,12 @@ ...@@ -50,6 +50,12 @@
<version>2.6.0</version> <version>2.6.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency> <dependency>
<groupId>cn.hutool</groupId> <groupId>cn.hutool</groupId>
......
package com.mortals.xhx.queue;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class TbQueueCoreSettings {
@Value("${queue.core.topic}")
private String topic;
@Value("${queue.core.partitions}")
private int partitions;
}
package com.mortals.xhx.queue;
public interface TbQueueMsgDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg);
}
...@@ -11,7 +11,7 @@ import java.io.IOException; ...@@ -11,7 +11,7 @@ import java.io.IOException;
* @author: zxfei * @author: zxfei
* @date: 2021/11/22 11:22 * @date: 2021/11/22 11:22
*/ */
public interface TbKafkaDecoder<T> { public interface TbKafkaDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg) throws IOException; T decode(TbQueueMsg msg) throws IOException;
......
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqConsumerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqProducerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqSettings;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueCoreSettings coreSettings;
public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, TbQueueCoreSettings coreSettings) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
}
@Override
public TbQueueProducer<TbQueueMsg> createMsgProducer() {
return new TbRabbitMqProducerTemplate<>(rabbitMqSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public UUID getKey() {
return msg.getKey();
}
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
@Override
public byte[] getData() {
return msg.getData();
}
});
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgDecoder;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Slf4j
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
private final TbQueueMsgDecoder<T> decoder;
private final Channel channel;
private final Connection connection;
private volatile Set<String> queues;
public TbRabbitMqConsumerTemplate(TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
super(topic);
this.decoder = decoder;
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
throw new RuntimeException("Failed to create connection." , e);
}
try {
channel = connection.createChannel();
} catch (IOException e) {
log.error("Failed to create chanel." , e);
throw new RuntimeException("Failed to create chanel." , e);
}
stopped = false;
}
@Override
protected List<GetResponse> doPoll(long durationInMillis) {
List<GetResponse> result = queues.stream()
.map(queue -> {
try {
return channel.basicGet(queue, false);
} catch (IOException e) {
log.error("Failed to get messages from queue: [{}]" , queue);
throw new RuntimeException("Failed to get messages from queue." , e);
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (result.size() > 0) {
return result;
} else {
return Collections.emptyList();
}
}
@Override
protected void doSubscribe(List<String> topicNames) {
queues = partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
.collect(Collectors.toSet());
//queues.forEach(admin::createTopicIfNotExists);
}
@Override
protected void doCommit() {
try {
channel.basicAck(0, true);
} catch (IOException e) {
log.error("Failed to ack messages." , e);
}
}
@Override
protected void doUnsubscribe() {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
public T decode(GetResponse message) {
DefaultTbQueueMsg msg = JSON.parseObject(new String(message.getBody()), DefaultTbQueueMsg.class);
return decoder.decode(msg);
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.mortals.xhx.queue.*;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Slf4j
public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
private final String defaultTopic;
private final TbRabbitMqSettings rabbitMqSettings;
private final ListeningExecutorService producerExecutor;
private final Channel channel;
private final Connection connection;
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
public TbRabbitMqProducerTemplate(TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
this.defaultTopic = defaultTopic;
this.rabbitMqSettings = rabbitMqSettings;
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
throw new RuntimeException("Failed to create connection." , e);
}
try {
channel = connection.createChannel();
} catch (IOException e) {
log.error("Failed to create chanel." , e);
throw new RuntimeException("Failed to create chanel." , e);
}
}
@Override
public void init() {
}
@Override
public String getDefaultTopic() {
return defaultTopic;
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties();
try {
channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(null);
}
} catch (IOException e) {
log.error("Failed publish message: [{}]." , msg, e);
if (callback != null) {
callback.onFailure(e);
}
}
}
@Override
public void stop() {
if (producerExecutor != null) {
producerExecutor.shutdownNow();
}
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
private void createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
return;
}
// admin.createTopicIfNotExists(tpi.getFullTopicName());
topics.add(tpi);
}
}
package com.mortals.xhx.queue.rabbitmq;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class TbRabbitMqQueueArguments {
@Value("${queue.rabbitmq.queue-properties.core}")
private String coreProperties;
@Value("${queue.rabbitmq.queue-properties.rule-engine}")
private String ruleEngineProperties;
@Value("${queue.rabbitmq.queue-properties.transport-api}")
private String transportApiProperties;
@Value("${queue.rabbitmq.queue-properties.notifications}")
private String notificationsProperties;
@Value("${queue.rabbitmq.queue-properties.js-executor}")
private String jsExecutorProperties;
@Getter
private Map<String, Object> coreArgs;
@Getter
private Map<String, Object> ruleEngineArgs;
@Getter
private Map<String, Object> transportApiArgs;
@Getter
private Map<String, Object> notificationsArgs;
@Getter
private Map<String, Object> jsExecutorArgs;
@PostConstruct
private void init() {
coreArgs = getArgs(coreProperties);
ruleEngineArgs = getArgs(ruleEngineProperties);
transportApiArgs = getArgs(transportApiProperties);
notificationsArgs = getArgs(notificationsProperties);
jsExecutorArgs = getArgs(jsExecutorProperties);
}
private Map<String, Object> getArgs(String properties) {
Map<String, Object> configs = new HashMap<>();
for (String property : properties.split(";")) {
int delimiterPosition = property.indexOf(":");
String key = property.substring(0, delimiterPosition);
String strValue = property.substring(delimiterPosition + 1);
configs.put(key, getObjectValue(strValue));
}
return configs;
}
private Object getObjectValue(String str) {
if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) {
return Boolean.valueOf(str);
} else if (isNumeric(str)) {
return getNumericValue(str);
}
return str;
}
private Object getNumericValue(String str) {
if (str.contains(".")) {
return Double.valueOf(str);
} else {
return Long.valueOf(str);
}
}
private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?");
public boolean isNumeric(String strNum) {
if (strNum == null) {
return false;
}
return PATTERN.matcher(strNum).matches();
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
@Component
@Data
public class TbRabbitMqSettings {
@Value("${queue.rabbitmq.exchange_name:}")
private String exchangeName;
@Value("${queue.rabbitmq.host:}")
private String host;
@Value("${queue.rabbitmq.port:}")
private int port;
@Value("${queue.rabbitmq.virtual_host:}")
private String virtualHost;
@Value("${queue.rabbitmq.username:}")
private String username;
@Value("${queue.rabbitmq.password:}")
private String password;
@Value("${queue.rabbitmq.automatic_recovery_enabled:}")
private boolean automaticRecoveryEnabled;
@Value("${queue.rabbitmq.connection_timeout:}")
private int connectionTimeout;
@Value("${queue.rabbitmq.handshake_timeout:}")
private int handshakeTimeout;
private ConnectionFactory connectionFactory;
@PostConstruct
private void init() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
connectionFactory.setConnectionTimeout(connectionTimeout);
connectionFactory.setHandshakeTimeout(handshakeTimeout);
}
}
package com.mortals.xhx.daemon.applicationservice; package com.mortals.xhx.daemon.applicationservice;
import com.mortals.framework.springcloud.service.IApplicationStartedService; import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.queue.provider.TbQueueProducerProvider;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
...@@ -11,8 +16,19 @@ public class DemoStartedService implements IApplicationStartedService { ...@@ -11,8 +16,19 @@ public class DemoStartedService implements IApplicationStartedService {
private static Log logger = LogFactory.getLog(DemoStartedService.class); private static Log logger = LogFactory.getLog(DemoStartedService.class);
@Autowired
private TbQueueProducerProvider tbQueueProducerProvider;
@Override @Override
public void start() { public void start() {
// TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo();
// new Default
// tbQueueProducerProvider.getTbCoreMsgProducer().send(topicPartitionInfo,);
logger.info("开始服务..[配置已加载完成,并且所有框架都已经初始化]"); logger.info("开始服务..[配置已加载完成,并且所有框架都已经初始化]");
} }
......
...@@ -58,6 +58,7 @@ upload: ...@@ -58,6 +58,7 @@ upload:
queue: queue:
type: @profiles.queue.type@ # memory or kafka (Apache Kafka) or rabbitmq (RabbitMQ) type: @profiles.queue.type@ # memory or kafka (Apache Kafka) or rabbitmq (RabbitMQ)
core: core:
topic: tb_core
poll-interval: 25 poll-interval: 25
partitions: 10 partitions: 10
pack-processing-timeout: 2000 pack-processing-timeout: 2000
...@@ -81,4 +82,14 @@ queue: ...@@ -81,4 +82,14 @@ queue:
enabled: true enabled: true
print-interval-ms: 60000 print-interval-ms: 60000
kafka-response-timeout-ms: 1000 kafka-response-timeout-ms: 1000
rabbitmq:
exchange_name:
host: 192.168.0.252
port: 5672
virtual_host: /
username: taxi_mq
password: admin@2020
automatic_recovery_enabled: false
connection_timeout: 60000
handshake_timeout: 10000
...@@ -42,7 +42,8 @@ ...@@ -42,7 +42,8 @@
<profiles.redis.password></profiles.redis.password> <profiles.redis.password></profiles.redis.password>
<profiles.redis.database>1</profiles.redis.database> <profiles.redis.database>1</profiles.redis.database>
<!-- 消息队列类型,memory,kafka,rabbitMQ--> <!-- 消息队列类型,memory,kafka,rabbitMQ-->
<profiles.queue.type>kafka</profiles.queue.type> <profiles.queue.type>rabbitmq</profiles.queue.type>
<!-- <profiles.queue.type>kafka</profiles.queue.type>-->
<profiles.kafka.brokers>192.168.0.251:9092</profiles.kafka.brokers> <profiles.kafka.brokers>192.168.0.251:9092</profiles.kafka.brokers>
<profiles.filepath>/mortals/data</profiles.filepath> <profiles.filepath>/mortals/data</profiles.filepath>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment