Commit 8a450b7c authored by 赵啸非's avatar 赵啸非

精简部分类

parent 9b2207d5
package com.mortals.xhx.base.framework; package com.mortals.xhx.base.framework;
import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonDeserializer;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
......
package com.mortals.xhx.base.framework;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import java.io.IOException;
import java.text.ParseException;
import java.util.Date;
/**
* 自定义Jackson反序列化日期类型时应用的类型转换器,一般用于@RequestBody接受参数时使用
*/
public class DateJacksonConverter extends JsonDeserializer {
private static String[] pattern = new String[]{"yyyy-MM-dd", "yyyy-MM-dd HH:mm", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss.S", "yyyy.MM.dd", "yyyy.MM.dd HH:mm", "yyyy.MM.dd HH:mm:ss", "yyyy.MM.dd HH:mm:ss.S", "yyyy/MM/dd", "yyyy/MM/dd HH:mm", "yyyy/MM/dd HH:mm:ss", "yyyy/MM/dd HH:mm:ss.S"};
@Override
public Date deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
Date targetDate = null;
String originDate = p.getText();
if (StringUtils.isNotEmpty(originDate)) {
try {
long longDate = Long.valueOf(originDate.trim());
targetDate = new Date(longDate);
} catch (NumberFormatException e) {
try {
targetDate = DateUtils.parseDate(originDate, DateJacksonConverter.pattern);
} catch (ParseException pe) {
throw new IOException(String.format("'%s' can not convert to type 'java.util.Date',just support timestamp(type of long) and following date format(%s)",
originDate,
StringUtils.join(pattern, ",")));
}
}
}
return targetDate;
}
@Override
public Class handledType() {
return Date.class;
}
}
package com.mortals.xhx.base.framework.aspect; package com.mortals.xhx.base.framework.annotation.aspect;
import com.mortals.framework.model.OperateLogPdu; import com.mortals.framework.model.OperateLogPdu;
import com.mortals.framework.service.ILogService; import com.mortals.framework.service.ILogService;
......
package com.mortals.xhx.base.framework.aspect; package com.mortals.xhx.base.framework.annotation.aspect;
import cn.hutool.extra.servlet.ServletUtil; import cn.hutool.extra.servlet.ServletUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
......
package com.mortals.xhx.base.framework.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mortals.xhx.base.framework.DateJacksonConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperFactoryBean;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import java.util.TimeZone;
@Configuration
public class ConverterConfig {
@Bean
public DateJacksonConverter dateJacksonConverter() {
return new DateJacksonConverter();
}
@Bean
public Jackson2ObjectMapperFactoryBean jackson2ObjectMapperFactoryBean(DateJacksonConverter dateJacksonConverter) {
Jackson2ObjectMapperFactoryBean jackson2ObjectMapperFactoryBean = new Jackson2ObjectMapperFactoryBean();
jackson2ObjectMapperFactoryBean.setDeserializers(dateJacksonConverter);
return jackson2ObjectMapperFactoryBean;
}
@Bean
public MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter(ObjectMapper objectMapper) {
MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter = new MappingJackson2HttpMessageConverter();
objectMapper.setTimeZone(TimeZone.getTimeZone("GMT+8"));
mappingJackson2HttpMessageConverter.setObjectMapper(objectMapper);
return mappingJackson2HttpMessageConverter;
}
}
package com.mortals.xhx.base.framework.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.sql.SQLException;
//@Component
//@ConfigurationProperties(prefix = "spring.datasource")
public class DruidSource {
private String url;
private String username;
private String password;
private String driverClassName;
private int initialSize;
private int minIdle;
private int maxActive;
private int maxWait;
private int timeBetweenEvictionRunsMillis;
private int minEvictableIdleTimeMillis;
private String validationQuery;
private boolean testWhileIdle;
private boolean testOnBorrow;
private boolean testOnReturn;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public int getInitialSize() {
return initialSize;
}
public void setInitialSize(int initialSize) {
this.initialSize = initialSize;
}
public int getMinIdle() {
return minIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public int getMaxActive() {
return maxActive;
}
public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}
public int getMaxWait() {
return maxWait;
}
public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}
public int getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}
public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public int getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}
public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public boolean isTestWhileIdle() {
return testWhileIdle;
}
public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}
public boolean isTestOnBorrow() {
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public boolean isTestOnReturn() {
return testOnReturn;
}
public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}
@Bean //声明其为Bean实例
@Primary //在同样的DataSource中,首先使用被标注的DataSource
public DataSource dataSource() throws SQLException {
DruidDataSource datasource = new DruidDataSource();
datasource.setUrl(url);
datasource.setUsername(username);
datasource.setPassword(password);
datasource.setDriverClassName(driverClassName);
//configuration
datasource.setInitialSize(initialSize);
datasource.setMinIdle(minIdle);
datasource.setMaxActive(maxActive);
datasource.setMaxWait(maxWait);
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setValidationQuery(validationQuery);
datasource.setTestWhileIdle(testWhileIdle);
datasource.setTestOnBorrow(testOnBorrow);
datasource.setTestOnReturn(testOnReturn);
return datasource;
}
}
package com.mortals.xhx.base.framework.config;
import com.mortals.framework.filter.RepeatableFilter;
import com.mortals.framework.filter.XssFilter;
import com.mortals.framework.util.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import javax.servlet.DispatcherType;
import java.util.HashMap;
import java.util.Map;
/**
* Filter配置
*
* @author zxfei
*/
//@Configuration
public class FilterConfig {
@Value("${xss.enabled}")
private String enabled;
@Value("${xss.excludes}")
private String excludes;
@Value("${xss.urlPatterns}")
private String urlPatterns;
@SuppressWarnings({"rawtypes", "unchecked"})
@Bean
public FilterRegistrationBean xssFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setDispatcherTypes(DispatcherType.REQUEST);
registration.setFilter(new XssFilter());
registration.addUrlPatterns(StringUtils.split(urlPatterns, ","));
registration.setName("xssFilter");
registration.setOrder(FilterRegistrationBean.HIGHEST_PRECEDENCE);
Map<String, String> initParameters = new HashMap<String, String>();
initParameters.put("excludes", excludes);
initParameters.put("enabled", enabled);
registration.setInitParameters(initParameters);
return registration;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Bean
public FilterRegistrationBean someFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setFilter(new RepeatableFilter());
registration.addUrlPatterns("/*");
registration.setName("repeatableFilter");
registration.setOrder(FilterRegistrationBean.LOWEST_PRECEDENCE);
return registration;
}
}
package com.mortals.xhx.base.framework.config; package com.mortals.xhx.base.framework.config;
import java.util.HashSet; import com.mortals.framework.util.StringUtils;
import java.util.Set;
import javax.annotation.PostConstruct;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import com.mortals.framework.util.StringUtils; import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.Set;
@Configuration @Configuration
public class InterceptorConfig { public class InterceptorConfig {
......
...@@ -13,7 +13,6 @@ import javax.sql.DataSource; ...@@ -13,7 +13,6 @@ import javax.sql.DataSource;
@Configuration @Configuration
//@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class MybatisConfiguration extends AbstractMybatisConfiguration { public class MybatisConfiguration extends AbstractMybatisConfiguration {
private static Log logger = LogFactory.getLog(MybatisConfiguration.class); private static Log logger = LogFactory.getLog(MybatisConfiguration.class);
......
package com.mortals.xhx.base.framework.config;
import com.mortals.xhx.base.framework.listener.RabbitLoggingErrorHandler;
import com.mortals.xhx.base.framework.listener.SimpleDynamicListener;
import com.mortals.xhx.base.system.message.MessageCallbackService;
import com.mortals.xhx.base.system.message.impl.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@Configuration
@Order(1)
@Slf4j
public class RabbitConfig {
@Autowired
private SimpleDynamicListener simpleDynamicListener;
@Autowired
private RabbitLoggingErrorHandler rabbitLoggingErrorHandler;
@Autowired
private MessageProducer messageProducer;
@Autowired
private MessageCallbackService messageCallbackService;
//@Bean("simpleMessageListenerContainer")
public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory cachingConnectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setConcurrentConsumers(10);
container.setMaxConcurrentConsumers(100);
container.setMessageListener(simpleDynamicListener);//配置队列监听器
container.start();
return container;
}
@Bean
public DirectMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//后置处理器,接收到的消息都添加头信息
container.setAfterReceivePostProcessors(message -> {
message.getMessageProperties().setContentType("application/json");
return message;
});
container.setDefaultRequeueRejected(false);
//设置异常处理
//container.setErrorHandler(rabbitLoggingErrorHandler);
// 并发消费,不使用
// container.setConsumersPerQueue(3);
// container.setMaxConcurrentConsumers(10);
// container.setMessageListener(directDynamicListener);
return container;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/* rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});*/
/* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ObjectUtils.isEmpty(correlationData)){
// 发送消息的时候发送的业务id
log.info("发送消息id:{},ack:{}",correlationData.getId(),ack);
}
}
});*/
rabbitTemplate.setReturnCallback(messageCallbackService);
/* rabbitTemplate.setReturnCallback(messageCallbackService);
rabbitTemplate.setConfirmCallback(messageCallbackService);*/
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
//@Bean(name = "consumerBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 额外添加批量消费的属性
factory.setBatchListener(true);
factory.setBatchSize(20);
factory.setReceiveTimeout(5 * 1000L);
factory.setConsumerBatchEnabled(true);
return factory;
}
//修改系列和与反序列化转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(10000);
return asyncRabbitTemplate;
}
}
//package com.mortals.xhx.base.framework.config;
//
//import lombok.Data;
//import lombok.extern.slf4j.Slf4j;
//import org.redisson.Redisson;
//import org.redisson.api.RedissonClient;
//import org.redisson.client.codec.Codec;
//import org.redisson.codec.JsonJacksonCodec;
//import org.redisson.config.Config;
//import org.springframework.boot.context.properties.ConfigurationProperties;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
///**
// * RedissonConfig配置
// *
// * @author: zxfei
// * @date: 2022/8/8 16:02
// */
//@Configuration
//@ConfigurationProperties(prefix = "spring.redis")
//@Slf4j
//@Data
//public class RedissonConfig {
//
// private String host;
//
// private int port;
//
// private String password;
//
// private int database;
//
// @Bean
// public RedissonClient redissonClient() {
// RedissonClient redissonClient;
// Config config = new Config();
// String url = "redis://" + host + ":" + port;
// // 单节点配置
// if("".equalsIgnoreCase(password)) password=null;
// config.useSingleServer().setAddress(url).setDatabase(database).setPassword(password);
// //使用json序列化方式
// Codec codec = new JsonJacksonCodec();
// config.setCodec(codec);
//
// // 主从配置
// /*config.useMasterSlaveServers()
// // 设置redis主节点
// .setMasterAddress("redis://192.168.1.120:6379")
// // 设置redis从节点
// .addSlaveAddress("redis://192.168.1.130:6379", "redis://192.168.1.140:6379");*/
// // 哨兵部署方式,sentinel是采用Paxos拜占庭协议,一般sentinel至少3个节点
// /*config.useSentinelServers()
// .setMasterName("my-sentinel-name")
// .addSentinelAddress("redis://192.168.1.120:6379")
// .addSentinelAddress("redis://192.168.1.130:6379")
// .addSentinelAddress("redis://192.168.1.140:6379");*/
// // 集群部署方式,cluster方式至少6个节点,3主3从,3主做sharding,3从用来保证主宕机后可以高可用
// /*config.useClusterServers()
// // 集群状态扫描间隔时间,单位是毫秒
// .setScanInterval(2000)
// .addNodeAddress("redis://192.168.1.120:6379")
// .addNodeAddress("redis://192.168.1.130:6379")
// .addNodeAddress("redis://192.168.1.140:6379")
// .addNodeAddress("redis://192.168.1.150:6379")
// .addNodeAddress("redis://192.168.1.160:6379")
// .addNodeAddress("redis://192.168.1.170:6379");*/
// // 云托管部署方式,这种方式主要解决redis提供商为云服务的提供商的redis连接,比如亚马逊云、微软云
// /*config.useReplicatedServers()
// // 主节点变化扫描间隔时间
// .setScanInterval(2000)
// .addNodeAddress("redis://192.168.1.120:6379")
// .addNodeAddress("redis://192.168.1.130:6379")
// .addNodeAddress("redis://192.168.1.140:6379");*/
// redissonClient = Redisson.create(config);
// return redissonClient;
// }
//}
\ No newline at end of file
package com.mortals.xhx.base.framework.listener;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.service.IMessageProduceService;
import com.mortals.xhx.common.pdu.DefaultQueueMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 设备上行消息处理
*
* @author: zxfei
* @date: 2022/8/23 0:32
* @description:
**/
@Slf4j
@Service
public class DirectDynamicListener implements MessageListener {
@Autowired
private ICacheService cacheService;
@Autowired
private IMessageProduceService messageProducer;
@Override
public void onMessage(Message message) {
String queue = message.getMessageProperties().getConsumerQueue();
byte[] body = message.getBody();
String data = new String(body);
// log.info("接收到:{} ,\n消息内容为:{}", queue, data);
DefaultQueueMsg queueMsg = JSON.parseObject(data, DefaultQueueMsg.class);
}
}
package com.mortals.xhx.base.system.message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public interface MessageCallbackService extends RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
}
\ No newline at end of file
package com.mortals.xhx.base.system.message;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import com.mortals.xhx.queue.TbQueueCallback;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo;
public interface MessageService {
/**
* 发送消息
*
* @param info
* @param header
* @param message
* @param callback
*/
void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback);
/**
* 请求队列
*
* @param info
* @param callback
*/
void queueDeclare(TopicPartitionInfo info, TbQueueCallback callback);
/**
* 删除队列
*/
void delQueue(String queue, TbQueueCallback callback);
/**
* 获取鉴权token
*/
String getBasePlatformToken();
/**
* 获取站点树
*
* @return
*/
String siteTree();
/**
* 发送第三方平台crud消息
*
* @param sendUrl
* @param deviceReq
* @return
*/
ApiResp<String> sendThirdParty(String sendUrl, DeviceReq deviceReq);
/**
* 发送第三方平台四二班透传消息
*
* @param sendUrl
* @param content
* @return
*/
void sendThirdParty(String sendUrl, String content);
}
\ No newline at end of file
package com.mortals.xhx.base.system.message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
public interface RabbitMessageService {
/**
* 发送消息
*
* @author: zxfei
* @date: 2022/9/2 11:31
*/
void sendMsg(String exchange, String routingKey, String message);
/**
* 发送消息
*
* @author: zxfei
* @date: 2022/9/2 11:31
*/
void sendMsg(String exchange, String routingKey, String message, CorrelationData correlationData);
/**
* 新增队列并绑定交换机与路由
*
* @param exchange
* @param routingKey
* @param queue
*/
void queueAddAndBinds(String exchange, String routingKey, String queue);
/**
* 删除队列
*
* @param queue
*/
Boolean queueDelete(String queue);
}
\ No newline at end of file
package com.mortals.xhx.base.system.message.impl;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.base.system.message.MessageCallbackService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MessageCallbackServiceImpl implements MessageCallbackService {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.debug("confirm,correlationData:{},ack:{},cause:{}", JSON.toJSONString(correlationData), ack, cause);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.debug("returnedMessage,replyCode:{},replyText:{},exchange:{},routingKey:{}", replyCode, replyText, exchange, routingKey);
}
}
\ No newline at end of file
package com.mortals.xhx.base.system.message.impl;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.model.AccessLogPdu;
import com.mortals.framework.model.BizLogPdu;
import com.mortals.framework.model.ErrorLogPdu;
import com.mortals.framework.model.OperateLogPdu;
import com.mortals.framework.service.IMessageProduceService;
import com.mortals.xhx.base.system.message.RabbitMessageService;
import com.mortals.xhx.common.key.QueueKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class MessageProducer implements IMessageProduceService, RabbitMessageService, InitializingBean {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
public void syncAccessSend(AccessLogPdu accessLogPdu) {
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.ACCESS_LOG_QUEUE, JSON.toJSONString(accessLogPdu));
}
@Override
public void syncBizSend(BizLogPdu bizLogPdu) {
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.BIZ_LOG_QUEUE, JSON.toJSONString(bizLogPdu));
}
@Override
public void syncErrorSend(ErrorLogPdu errorLogPdu) {
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.ERROR_LOG_QUEUE, JSON.toJSONString(errorLogPdu));
}
@Override
public void syncOperSend(OperateLogPdu operLogPdu) {
log.info("send operate log ==>{}", JSON.toJSONString(operLogPdu));
String send = JSON.toJSONString(operLogPdu);
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.OPERATION_LOG_QUEUE, send);
}
@Override
public void sendMsg(String exchange, String routingKey, String message) {
CorrelationData correlationData = new CorrelationData(IdUtil.fastSimpleUUID());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
@Override
public void sendMsg(String exchange, String routingKey, String message, CorrelationData correlationData) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
@Override
public void queueAddAndBinds(String exchange, String routingKey, String queue) {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 86400000);
Queue rabbitQueue = new Queue(queue, true, false, false, args);
DirectExchange directExchange = new DirectExchange(exchange);
rabbitAdmin.declareQueue(rabbitQueue);
rabbitAdmin.declareBinding(
BindingBuilder.bind(rabbitQueue)
.to(directExchange)
.with(routingKey)
);
log.debug("队列创建绑定成功,queue:{}", queue);
}
@Override
public Boolean queueDelete(String queue) {
return rabbitAdmin.deleteQueue(queue);
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
package com.mortals.xhx.base.system.message.impl;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.HttpUtil;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.req.ApiThirdPartyReq;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.MessageTypeEnum;
import com.mortals.xhx.common.code.YesNoEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.pdu.DeviceReq;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.Map;
import static com.mortals.framework.util.HttpUtil.HEADER_CONTENT_TYPE;
import static com.mortals.xhx.common.key.Constant.PATH_LOGIN;
import static com.mortals.xhx.common.key.Constant.PATH_SITETREE;
/**
* DeviceService
* 设备 service实现
*
* @author zxfei
* @date 2022-03-09
*/
@Service("messageService")
@Slf4j
public class MessageServiceImpl implements MessageService{
@Value("${baseplatform.httpUrl:''}")
private String httpUrl;
@Value("${baseplatform.loginName:''}")
private String loginName;
@Value("${baseplatform.password:''}")
private String password;
@Autowired
private ICacheService cacheService;
//@Autowired
private TbCoreQueueProducerProvider producerProvider;
@Override
public void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastSimpleUUID(), message == null ? "" : message, header);
producer.send(info, queueMsg, callback);
//rabbitTemplate.send();
}
@Override
public void queueDeclare(TopicPartitionInfo info, TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
producer.queueDeclare(info, callback);
}
@Override
public void delQueue(String queue, TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
producer.queueDel(queue, callback);
}
@Override
public String getBasePlatformToken() {
String token = cacheService.get(Constant.BASEPLATFORM_AUTHTOKEN);
if (ObjectUtils.isEmpty(token)) {
JSONObject obj = new JSONObject();
obj.put("loginName", loginName);
obj.put("password", password);
obj.put("securityCode", "8888");
String resp = null;
try {
Map<String, String> header = new HashMap<>();
header.put(HEADER_CONTENT_TYPE, "application/json");
resp = HttpUtil.doPost(httpUrl + PATH_LOGIN, header, JSON.toJSONString(obj));
JSONObject jsonObject = JSON.parseObject(resp);
Integer code = jsonObject.getInteger("code");
if (code == YesNoEnum.YES.getValue()) {
JSONObject dataObj = jsonObject.getJSONObject("data");
String authtoken = dataObj.getString("token");
cacheService.setnx(Constant.BASEPLATFORM_AUTHTOKEN, authtoken, 7 * 24 * 60);
return authtoken;
} else {
throw new AppException("异常");
}
} catch (Exception e) {
log.error("异常:", e);
}
log.info("resp:{}", resp);
}
return token;
}
@Override
public String siteTree() {
String authToken = this.getBasePlatformToken();
String resp = null;
try {
Map<String, String> header = new HashMap<>();
header.put(HEADER_CONTENT_TYPE, "application/json");
header.put("Authorization", Constant.TOKEN_PREFIX + authToken);
resp = HttpUtil.doGet(httpUrl + PATH_SITETREE, header, new HashMap<>());
JSONObject jsonObject = JSON.parseObject(resp);
Integer code = jsonObject.getInteger("code");
if (code == YesNoEnum.YES.getValue()) {
JSONObject dataObj = jsonObject.getJSONObject("data");
return resp;
} else {
throw new AppException("异常");
}
} catch (Exception e) {
log.error("异常:", e);
}
log.info("resp:{}", resp);
return resp;
}
@Override
public ApiResp<String> sendThirdParty(String sendUrl, DeviceReq deviceReq) {
ApiThirdPartyReq<DeviceReq> deviceReqApiReq = new ApiThirdPartyReq<>();
deviceReqApiReq.setCode(YesNoEnum.YES.getValue());
deviceReqApiReq.setType(MessageTypeEnum.CRUD.getValue());
deviceReqApiReq.setData(deviceReq);
String resp = null;
try {
Map<String, String> header = new HashMap<>();
header.put(HEADER_CONTENT_TYPE, "application/json");
// log.info("\n thirdPartyUrl=>{} \n reqbody=>{} \n type=>{}", sendUrl, JSON.toJSONString(deviceReqApiReq, SerializerFeature.WriteMapNullValue), DeviceMethodEnum.getByValue(deviceReq.getDeviceStatus()).getDesc());
resp = HttpUtil.doPost(sendUrl, header, JSON.toJSONString(deviceReqApiReq, SerializerFeature.WriteMapNullValue),"UTF-8", 10000);
return JSON.parseObject(resp, ApiResp.class);
} catch (Exception e) {
log.error("异常:", e);
}
return null;
}
@Override
public void sendThirdParty(String sendUrl, String content) {
}
public static void main(String[] args) {
try {
throw new Exception("1213");
} catch (Exception e) {
e.printStackTrace();
String stacktrace = ExceptionUtil.stacktraceToString(e.fillInStackTrace());
JSONObject jsonObject = new JSONObject();
jsonObject.put("stacktrace", stacktrace);
System.out.println(JSON.toJSONString(jsonObject));
}
}
}
\ No newline at end of file
package com.mortals.xhx.common.pdu;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.Data;
/**
* @author: zxfei
* @date: 2022/9/2 10:27
* @description:
**/
@Data
public class DefaultQueueMsg {
/**
* 消息key
*/
private String key;
/**
* 消息体(base64编码)
*/
private String data;
private DefaultQueueMsgHeader headers;
public static void main(String[] args) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, "UPGREAD");
// header.put(MessageHeader.CLIENTID, "abcd1234");
// header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
//header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd");
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "eyJmbG93bnVtIjoiQzEwMTEifQ==" , header);
String ret = JSON.toJSONString(queueMsg);
System.out.println("pro:"+ret);
//
DefaultQueueMsg qu = JSON.parseObject(ret, DefaultQueueMsg.class);
System.out.println("header:"+qu.getHeaders().getData().get(MessageHeader.MESSAGETYPE));
System.out.println("data:"+qu.getData());
}
}
package com.mortals.xhx.common.pdu;
import lombok.Data;
import java.util.Map;
/**
* @author: zxfei
* @date: 2022/9/2 10:30
* @description:
**/
@Data
public class DefaultQueueMsgHeader {
/**
* 消息头
*/
private Map<String, String> data;
}
package com.mortals.xhx.daemon.applicationservice;
import cn.hutool.core.net.url.UrlBuilder;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.Constant.SEND_INTEVEL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
//@Component
@Slf4j
public class DeviceSendThirdPartyService implements IApplicationStartedService {
protected Boolean stopped = false;
@Autowired
private ICacheService cacheService;
@Autowired
private MessageService messageService;
@Value("${thirdPartyPath:/inter/device/deviceIn}")
public String thirdPartyPath;
@Override
public void start() {
log.info("初始化发送线程数量");
Thread sendThread = new Thread(new Runnable() {
@Override
public void run() {
int waitTime = SEND_INTEVEL;
while (!stopped) {
try {
DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE,10, DeviceReq.class);
if (!ObjectUtils.isEmpty(deviceReq)) {
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090");
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
} catch (Exception e) {
log.error("异常", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
}
}
}
});
sendThread.start();
}
@Override
public void stop() {
log.info("停止服务..");
this.stopped = true;
}
@Override
public int getOrder() {
return 50;
}
}
package com.mortals.xhx.common.model;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.common.code.MessageProtocolEnum;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.Setter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 默认消息头
*
* @author: zxfei
* @date: 2021/11/22 11:14
*/
public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
@Setter
protected Map<String, String> data = new HashMap<>();
public DefaultTbQueueMsgHeaders() {
data.put(MessageHeader.TIMESTAMP, String.valueOf(new Date().getTime()));
// data.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.SHA256withRSA).sign(data.get(MessageHeader.TIMESTAMP).getBytes())));
// TODO: 2022/4/15
data.put(MessageHeader.MESSAGESIGN, "abcd1234");
data.put(MessageHeader.MESSAGEPROTOCOL, MessageProtocolEnum.JSON.getValue());
// data.put(MessageHeader.TOPIC, "");
// data.put(MessageHeader.QOS, "0");
}
@Override
public void put(String key, String value) {
data.put(key, value);
}
@Override
public String get(String key) {
return data.get(key);
}
@Override
public Map<String, String> getData() {
return data;
}
}
package com.mortals.xhx.queue;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.extern.apachecommons.CommonsLog;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 服务端消费消息服务
*
* @author: zxfei
* @date: 2021/11/22 11:31
*/
@CommonsLog
public class ConsumerService {
private long pollDuration;
protected volatile ExecutorService consumersExecutor;
@Getter
private TbQueueConsumer<TbQueueMsg> mainConsumer;
private String currentIp;
protected volatile boolean stopped = false;
public void init(TbQueueConsumer<TbQueueMsg> mainConsumer) {
this.consumersExecutor = Executors.newCachedThreadPool();
this.mainConsumer = mainConsumer;
launchMainConsumers();
this.mainConsumer.subscribe();
}
public ConsumerService(long pollDuration, String currentIp) {
this.pollDuration = pollDuration;
this.currentIp = currentIp;
}
/**
* 消费服务主线程
*/
protected void launchMainConsumers() {
consumersExecutor.submit(() -> {
while (!stopped) {
try {
//todo
List<TbQueueMsg> poll = mainConsumer.poll(pollDuration);
List<TbQueueMsg> msgs = poll;
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item : msgs) {
//todo
// }
}
} catch (Exception e) {
log.error("Exception", e);
}
}
log.info("Queue Consumer stopped.");
});
}
public void destroy() {
if (!stopped) {
stopMainConsumers();
}
stopped = true;
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
}
protected void stopMainConsumers() {
if (mainConsumer != null) {
mainConsumer.unsubscribe();
}
}
}
package com.mortals.xhx.queue;
import com.mortals.xhx.queue.processing.AbstractConsumerService;
import com.mortals.xhx.queue.provider.TbCoreQueueFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
//@Service
@Slf4j
public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueueMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}")
private long pollDuration;//队列拉取时间间隔,单位毫秒
@Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout;
@Getter
private LinkedBlockingQueue<TbQueueMsg> comsureQueue = new LinkedBlockingQueue<>();
@Getter
private TbQueueConsumer<TbQueueMsg> mainConsumer;
@Getter
private List<TbQueueConsumer<TbQueueMsg>> consumerList;
/**
* 根据配置文件动态加载kafka,rabbmitMq等工厂类
* @param tbCoreQueueFactory
*/
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) {
//通过工厂类创建通道
this.mainConsumer = tbCoreQueueFactory.createMsgConsumer();
}
@PostConstruct
public void init() {
log.info("初始化消费服务线程");
super.init("core-consumer");
}
@PreDestroy
public void destroy() {
super.destroy();
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
}
@Override
protected void launchMainConsumers() {
log.info("启动消费线程!");
consumersExecutor.submit(() -> {
while (!stopped) {
try {
List<TbQueueMsg> msgs = mainConsumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item : msgs) {
comsureQueue.offer(item);
}
//mainConsumer.commit();
Thread.sleep(200);
} catch (Exception e) {
if (!stopped) {
// log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
// log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
log.info(" Core Consumer stopped.");
});
}
@Override
protected void stopMainConsumers() {
if (mainConsumer != null) {
mainConsumer.unsubscribe();
}
}
@Override
protected void launchConsumersList() {
log.info("启动消费线程组!");
consumerList.stream().forEach(consumer -> {
log.info("channel number:{}"+consumer.getChannelNumber());
consumersExecutor.submit(() -> {
while (!stopped) {
try {
List<TbQueueMsg> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item : msgs) {
comsureQueue.offer(item);
}
// consumer.commit();
Thread.sleep(200);
} catch (Exception e) {
if (!stopped) {
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
});
});
}
}
package com.mortals.xhx.queue;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.UUID;
/**
* 默认消息
*
* @author: zxfei
* @date: 2021/11/22 10:59
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DefaultTbQueueMsg implements TbQueueMsg {
/**
* key 唯一标识
*/
private String key;
/**
* 数据载体
*/
private String data;
/**
* 消息头信息
*/
private TbQueueMsgHeaders headers;
public DefaultTbQueueMsg(TbQueueMsg msg) {
this.key = msg.getKey();
this.data = msg.getData();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
msg.getHeaders().getData().entrySet().stream().forEach(item->
headers.put(item.getKey(),item.getValue()));
this.headers = headers;
}
public static void main(String[] args) {
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, "UPGREAD");
// header.put(MessageHeader.CLIENTID, "abcd1234");
// header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
//header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd");
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "eyJmbG93bnVtIjoiQzEwMTEifQ==" , header);
String ret = JSON.toJSONString(queueMsg);
System.out.println("pro:"+ret);
//
DefaultTbQueueMsg qu = JSON.parseObject(ret, DefaultTbQueueMsg.class);
System.out.println("header:"+qu.getHeaders().get(MessageHeader.MESSAGETYPE));
System.out.println("data:"+qu.getData());
}
}
package com.mortals.xhx.queue;
import com.mortals.xhx.common.BrokerConfig;
/**
* 消息队列工厂类,初始化MQ类型(kafka,rabbitMQ,memory)
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface MessageQueueFactory {
/**
* 创建消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> createMsgProducer(BrokerConfig brokerConfig);
/**
* 创建消息消费者
* @return
*/
TbQueueConsumer<TbQueueMsg> createMsgConsumer(BrokerConfig brokerConfig);
}
package com.mortals.xhx.queue;
import org.springframework.context.ApplicationListener;
public interface TbCoreConsumerService extends ApplicationListener {
}
package com.mortals.xhx.queue;
/**
* 队列回调消息
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface TbQueueCallback {
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}
package com.mortals.xhx.queue;
import java.util.List;
import java.util.Set;
/**
* 队列消息消费者接口
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface TbQueueConsumer<T extends TbQueueMsg> {
/**
* 获取当topic
* @return
*/
String getTopic();
/**
* 订阅
*/
void subscribe();
/**
* 订阅(分区)
* @param partitions
*/
void subscribe(Set<TopicPartitionInfo> partitions);
/**
* 取消订阅
*/
void unsubscribe();
/**
* 取消订阅消息
* @param partitions
*/
void unsubscribe(Set<TopicPartitionInfo> partitions);
/**
* 拉取消息间隔
* @param durationInMillis
* @return
*/
List<T> poll(long durationInMillis);
/**
* 提交
*/
void commit();
/**
* 通道
* @return
*/
String getChannelNumber();
}
package com.mortals.xhx.queue;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
@Data
//@Component
public class TbQueueCoreSettings {
@Value("${queue.core.topic}")
private String topic;
@Value("${queue.core.partitions}")
private int partitions;
}
package com.mortals.xhx.queue;
import java.util.UUID;
/**
* 队列消息体
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsg {
String getKey();
TbQueueMsgHeaders getHeaders();
String getData();
}
package com.mortals.xhx.queue;
public interface TbQueueMsgDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg);
}
package com.mortals.xhx.queue;
import java.util.Map;
/**
* 消息头信息
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsgHeaders {
void put(String key, String value);
String get(String key);
Map<String, String> getData();
void setData(Map<String, String> data);
}
package com.mortals.xhx.queue;
/**
* 队列消息元数据
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsgMetadata {
String getMessageId();
}
package com.mortals.xhx.queue;
/**
* 队列消息生产者
*
* @author: zxfei
* @date: 2021/11/22 10:55
*/
public interface TbQueueProducer<T extends TbQueueMsg> {
void init();
String getDefaultTopic();
//发送消息
void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback);
void stop();
void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback);
void queueDel(String queue, TbQueueCallback callback);
}
package com.mortals.xhx.queue;
import lombok.Builder;
import lombok.Data;
import java.util.Objects;
import java.util.Optional;
@Data
public class TopicPartitionInfo {
/**
* topic名称
*/
private String topic;
/**
* 分区,kafka存在
*/
private Integer partition;
/**
* 交换机名称,rabbmitmq存在
*/
private String exchangeName;
/**
* 带分区的topic
*/
private String fullTopicName;
@Builder
public TopicPartitionInfo(String topic, Integer partition, String exchangeName) {
this.topic = topic;
this.partition = partition;
this.exchangeName = exchangeName;
String tmp = topic;
if (partition != null) {
tmp += "." + partition;
}
this.fullTopicName = tmp;
}
public TopicPartitionInfo newByTopic(String topic) {
return new TopicPartitionInfo(topic, this.partition, "");
}
public String getTopic() {
return topic;
}
public Optional<Integer> getPartition() {
return Optional.ofNullable(partition);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionInfo that = (TopicPartitionInfo) o;
return topic.equals(that.topic) &&
Objects.equals(partition, that.partition) &&
fullTopicName.equals(that.fullTopicName);
}
@Override
public int hashCode() {
return Objects.hash(fullTopicName);
}
}
package com.mortals.xhx.queue.kafka;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
/**
* 抽象队列消费者模板
*
* @author: zxfei
* @date: 2021/11/22 11:12
*/
@Slf4j
public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
private volatile boolean subscribed;
protected volatile boolean stopped = false;
protected volatile Set<TopicPartitionInfo> partitions;
protected final ReentrantLock consumerLock = new ReentrantLock();
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
@Getter
private final String topic;
public AbstractTbQueueConsumerTemplate(String topic) {
this.topic = topic;
}
@Override
public void subscribe() {
if (stopped) {
log.error("consumer 线程已停止 topic {}", topic);
return;
}
subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null,"")));
}
@Override
public void subscribe(Set<TopicPartitionInfo> partitions) {
// log.info("订阅的topics {} ", JSON.toJSONString(partitions));
if (stopped) {
log.error("订阅服务已停止,topic {}", topic);
return;
}
subscribeQueue.add(partitions);
}
@Override
public List<T> poll(long durationInMillis) {
List<R> records;
long startNanos = System.nanoTime();
if (stopped) {
return errorAndReturnEmpty();
}
if (!subscribed && partitions == null && subscribeQueue.isEmpty()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
if (consumerLock.isLocked()) {
log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace"));
}
consumerLock.lock();
try {
//更新订阅的主题
while (!subscribeQueue.isEmpty()) {
subscribed = false;
partitions = subscribeQueue.poll();
}
//新增新的订阅信息
if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
//新加订阅项
doSubscribe(topicNames);
subscribed = true;
}
records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis);
} finally {
consumerLock.unlock();
}
if (records.isEmpty()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
return decodeRecords(records);
}
List<T> decodeRecords(List<R> records) {
List<T> result =records.stream().map(record->decode(record)).collect(Collectors.toList());
return result;
}
List<T> errorAndReturnEmpty() {
log.error("poll invoked but consumer stopped for topic:" + topic, new RuntimeException("stacktrace"));
return emptyList();
}
List<T> sleepAndReturnEmpty(final long startNanos, final long durationInMillis) {
long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis);
long spentNanos = System.nanoTime() - startNanos;
if (spentNanos < durationNanos) {
try {
Thread.sleep(Math.max(TimeUnit.NANOSECONDS.toMillis(durationNanos - spentNanos), 1));
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to wait", e);
}
}
}
return emptyList();
}
@Override
public void commit() {
if (consumerLock.isLocked()) {
log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace"));
}
consumerLock.lock();
try {
doCommit();
} finally {
consumerLock.unlock();
}
}
@Override
public void unsubscribe() {
log.info("unsubscribe topic and stop consumer {}", getTopic());
stopped = true;
consumerLock.lock();
try {
doUnsubscribe();
} finally {
consumerLock.unlock();
}
}
abstract protected List<R> doPoll(long durationInMillis);
abstract protected T decode(R record);
abstract protected void doSubscribe(List<String> topicNames);
abstract protected void doCommit();
abstract protected void doUnsubscribe();
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.UUID;
public class KafkaTbQueueMsg implements TbQueueMsg {
private final String key;
private final TbQueueMsgHeaders headers;
private final byte[] data;
public KafkaTbQueueMsg(ConsumerRecord<String, byte[]> record) {
this.key = record.key();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
record.headers().forEach(header -> {
headers.put(header.key(), new String(header.value()));
});
this.headers = headers;
this.data = record.value();
}
@Override
public String getKey() {
return key;
}
@Override
public TbQueueMsgHeaders getHeaders() {
return headers;
}
@Override
public String getData() {
return data.toString();
}
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsgMetadata;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 队列元数据
*
* @author: zxfei
* @date: 2021/11/22 14:40
*/
@Data
@AllArgsConstructor
public class KafkaTbQueueMsgMetadata implements TbQueueMsgMetadata {
private RecordMetadata metadata;
private String messageId;
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.Builder;
import lombok.extern.apachecommons.CommonsLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
/**
* kafka consumer 消费者模板
*
* @author: zxfei
* @date: 2021/11/22 11:21
*/
@Slf4j
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
private final KafkaConsumer<String, byte[]> consumer;
private final String groupId;
private final TbKafkaDecoder<T> decoder;
@Builder
private TbKafkaConsumerTemplate(TbKafkaSettings settings, String clientId, TbKafkaDecoder<T> decoder, String groupId, String topic) {
//默认topic
super(topic);
Properties props = settings.toConsumerProps();
//多个输入源的时候 需要配置
//props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
this.groupId = groupId;
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
}
@Override
protected void doSubscribe(List<String> topicNames) {
if (!topicNames.isEmpty()) {
// topicNames.forEach(admin::createTopicIfNotExists);
log.info("subscribe topics {}", topicNames);
consumer.subscribe(topicNames);
} else {
log.info("unsubscribe due to empty topic list");
consumer.unsubscribe();
}
}
@Override
protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
if (records.isEmpty()) {
return Collections.emptyList();
} else {
List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
records.forEach(recordList::add);
return recordList;
}
}
@Override
public T decode(ConsumerRecord<String, byte[]> record) {
return decoder.decode(new KafkaTbQueueMsg(record));
}
@Override
protected void doCommit() {
//同步提交,线程会阻塞,直到当前批次offset提交成功
consumer.commitAsync();
}
@Override
protected void doUnsubscribe() {
log.info("unsubscribe topic and close consumer for topic {}", getTopic());
if (consumer != null) {
//consumer.unsubscribe();
consumer.close();
}
}
public static void main(String[] args) {
// TbKafkaConsumerTemplate.builder().
}
@Override
public void unsubscribe(Set<TopicPartitionInfo> partitions) {
}
@Override
public String getChannelNumber() {
return "1231";
}
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsg;
import java.io.IOException;
/**
* 队列消息编码
*
* @author: zxfei
* @date: 2021/11/22 11:22
*/
public interface TbKafkaDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg);
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueCallback;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* kafka 生产者模板
*
* @author: zxfei
* @date: 2021/11/22 11:23
*/
@Data
@Slf4j
public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
/**
* 生产者
*/
private KafkaProducer<String, byte[]> producer;
/**
* kafka 配置信息
*/
private TbKafkaSettings settings;
private String defaultTopic;
/**
* topic组
*/
private Set<TopicPartitionInfo> topics;
@Builder
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic) {
this.settings = settings;
//初始化生产者参数
this.producer = new KafkaProducer<>(settings.toProducerProps());
this.defaultTopic = defaultTopic;
topics = ConcurrentHashMap.newKeySet();
}
@Override
public void init() {
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
String key = msg.getKey().toString();
byte[] data = msg.getData().getBytes();
ProducerRecord<String, byte[]> record;
if (tpi.getTopic() == null) {
tpi.setTopic(this.defaultTopic);
}
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue().getBytes())).collect(Collectors.toList());
record = new ProducerRecord<>(tpi.getTopic(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata,key));
}
} else {
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
}
});
}
@Override
public void stop() {
if (producer != null) {
producer.close();
}
}
@Override
public void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback) {
// TODO: 2022/4/29 创建kafka队列
}
@Override
public void queueDel(String queue, TbQueueCallback callback) {
// TODO: 2022/5/20 删除队列
}
}
package com.mortals.xhx.queue.kafka;
import lombok.Data;
/**
* 其它配置类
*
* @author: zxfei
* @date: 2021/11/22 13:31
*/
@Data
public class TbKafkaProperty {
private String key;
private String value;
}
package com.mortals.xhx.queue.kafka;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Properties;
/**
* kafka 配置类
*
* @author: zxfei
* @date: 2021/11/22 13:30
*/
@Slf4j
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@ConfigurationProperties(prefix = "queue.kafka")
@Component
public class TbKafkaSettings {
@Value("${queue.kafka.bootstrap.servers}")
private String servers;
@Value("${queue.kafka.acks}")
private String acks;
@Value("${queue.kafka.retries}")
private int retries;
@Value("${queue.kafka.batch.size}")
private int batchSize;
@Value("${queue.kafka.linger.ms}")
private long lingerMs;
@Value("${queue.kafka.buffer.memory}")
private long bufferMemory;
@Value("${queue.kafka.replication_factor}")
@Getter
private short replicationFactor;
@Value("${queue.kafka.max_poll_records:8192}")
private int maxPollRecords;
@Value("${queue.kafka.max_poll_interval_ms:300000}")
private int maxPollIntervalMs;
@Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
private int maxPartitionFetchBytes;
@Value("${queue.kafka.fetch_max_bytes:134217728}")
private int fetchMaxBytes;
@Setter
private List<TbKafkaProperty> other;
/**
* 管理端参数配置
* @return
*/
public Properties toAdminProps() {
Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
return props;
}
/**
* 消费者参数
*
* @return
*/
public Properties toConsumerProps() {
Properties props = toProps();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return props;
}
/**
* 生产者参数
*
* @return
*/
public Properties toProducerProps() {
Properties props = toProps();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return props;
}
private Properties toProps() {
Properties props = new Properties();
//添加其它参数
if (other != null) {
other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
}
return props;
}
}
package com.mortals.xhx.queue.processing;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.utils.IotThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 消费服务抽象类
*
* @author: zxfei
* @date: 2022/4/28 9:43
*/
@Slf4j
public abstract class AbstractConsumerService<N extends TbQueueMsg> {
/**
* 启动消费主线程服务
*/
protected abstract void launchMainConsumers();
/**
* 停止消费主线程服务
*/
protected abstract void stopMainConsumers();
/**
* 启动一组消费线程
*/
protected abstract void launchConsumersList();
/**
* 消息线程池
*/
protected volatile ExecutorService consumersExecutor;
/**
* 变量
*/
protected volatile boolean stopped = false;
public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
launchMainConsumers();
}
@PreDestroy
public void destroy() {
stopped = true;
stopMainConsumers();
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.kafka.TbKafkaConsumerTemplate;
import com.mortals.xhx.queue.kafka.TbKafkaProducerTemplate;
import com.mortals.xhx.queue.kafka.TbKafkaSettings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* kafka 消息工厂类
*
* @author: zxfei
* @date: 2021/11/22 15:00
*/
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
@Autowired
private TbKafkaSettings kafkaSettings;
/**
* 初始化创建消息生产者
*
* @return
*/
@Override
public TbQueueProducer<TbQueueMsg> createMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbQueueMsg> builder = TbKafkaProducerTemplate.builder();
builder.settings(kafkaSettings);
return builder.build();
}
/**
* 初始化创建消息消费者
*
* @return
*/
@Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbQueueMsg> comsumerBuilder = TbKafkaConsumerTemplate.builder();
comsumerBuilder.settings(kafkaSettings);
return comsumerBuilder.build();
}
@Override
public List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts) {
return null;
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqConsumerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqProducerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqSettings;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
//@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueCoreSettings coreSettings;
public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, TbQueueCoreSettings coreSettings) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
}
@Override
public TbQueueProducer<TbQueueMsg> createMsgProducer() {
return new TbRabbitMqProducerTemplate<>(rabbitMqSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public String getKey() {
return msg.getKey();
}
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
@Override
public String getData() {
return msg.getData();
}
});
}
@Override
public List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts) {
List<TbQueueConsumer<TbQueueMsg>> list =vhosts.stream().map(vhost->{
rabbitMqSettings.setVHost(vhost);
return new TbRabbitMqConsumerTemplate<TbQueueMsg>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public String getKey() {
return msg.getKey();
}
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
@Override
public String getData() {
return msg.getData();
}
});
}).collect(Collectors.toList());
return list;
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import java.util.List;
public interface TbCoreQueueFactory {
/**
* 消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> createMsgProducer();
/**
* 消息消费服务
* @return
*/
TbQueueConsumer<TbQueueMsg> createMsgConsumer();
/* *//**
* 消息消费服务
* @return
*//*
TbQueueConsumer<TbQueueMsg> createMsgConsumer();*/
/**
*
* @return
*/
List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts);
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* 初始化消息生产者服务
*/
@CommonsLog
//@Service
public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
/**
* 消息队列提供
*/
@Autowired
private TbCoreQueueFactory tbQueueProvider;
/**
* 消息队列生产者
*/
private TbQueueProducer<TbQueueMsg> queueProducer;
//@PostConstruct
public void init() {
log.info("消息队列生产服务开始...");
this.queueProducer = tbQueueProvider.createMsgProducer();
}
@Override
public TbQueueProducer<TbQueueMsg> getTbCoreMsgProducer() {
return queueProducer;
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
/**
* 消息队列提供接口
*
* @author: zxfei
* @date: 2021/11/22 14:59
*/
public interface TbQueueProducerProvider {
/**
* 消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> getTbCoreMsgProducer();
}
package com.mortals.xhx.queue.rabbitmq;
import com.mortals.xhx.queue.TbQueueMsgMetadata;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 队列元数据
*
* @author: zxfei
* @date: 2021/11/22 14:40
*/
@Data
@AllArgsConstructor
public class RabbitQueueMsgMetadata implements TbQueueMsgMetadata {
private String messageId;
}
package com.mortals.xhx.queue.rabbitmq;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.util.JsonUtil;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.*;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* 消费连接池
*
* @author: zxfei
* @date: 2022/4/25 13:49
*/
@Slf4j
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
private final TbQueueMsgDecoder<T> decoder;
@Getter
private Channel channel;
private Connection connection;
private TbRabbitMqSettings rabbitMqSettings;
private volatile Set<String> queues=new HashSet<>();
public TbRabbitMqConsumerTemplate(TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
super(topic);
this.decoder = decoder;
this.rabbitMqSettings = rabbitMqSettings;
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
log.info("channelNumber:" + channel.getChannelNumber());
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection.", e);
}
stopped = false;
}
@Override
protected List<GetResponse> doPoll(long durationInMillis) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<GetResponse> result = queues.stream()
.map(queue -> {
try {
GetResponse getResponse = channel.basicGet(queue, true);
return getResponse;
} catch (IOException e) {
log.error("Failed to get messages from queue: {},{}", queue, e);
try {
channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Integer.parseInt(rabbitMqSettings.getMessageTtl()));
channel.queueDeclare(queue, true, false, false, args);
} catch (IOException ioException) {
ioException.printStackTrace();
}
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (result.size() > 0) {
return result;
} else {
return Collections.emptyList();
}
}
@Override
protected void doSubscribe(List<String> topicNames) {
//新增的topkcnames
topicNames.stream().forEach(topic->{
queues.add(topic);
});
log.info("doSubscribe:{}", JSON.toJSONString(queues));
}
@Override
protected void doCommit() {
try {
channel.basicAck(0, true);
} catch (IOException e) {
log.error("Failed to ack messages.", e);
}
}
@Override
protected void doUnsubscribe() {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
@Override
public void unsubscribe(Set<TopicPartitionInfo> partitions) {
Set<String> collect = partitions.stream().map(item -> item.getFullTopicName()).collect(Collectors.toSet());
queues=queues.stream().filter(f->collect.contains(f)).collect(Collectors.toSet());
}
@Override
public String getChannelNumber() {
return channel.getChannelNumber() + "";
}
public T decode(GetResponse message) {
try {
DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
String messageBody = new String(message.getBody());
if(JSONUtil.isJson(messageBody)){
Map<String, Object> map = JSON.parseObject(messageBody, HashMap.class);
msg.setKey((String) map.get("key"));
String payloadStr = (String) map.get("data");
msg.setData(payloadStr);
String headerStr = ((JSONObject) map.get("headers")).getString("data");
HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class);
headers.setData(hashMap);
msg.setHeaders(headers);
return decoder.decode(msg);
}else{
throw new AppException("消息内容异常");
}
} catch (Exception e) {
log.error("message:"+new String(message.getBody()),"反序列化异常!", e);
return null;
}
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.mortals.xhx.queue.*;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* rabbmit 消息生产模板
*
* @author: zxfei
* @date: 2022/4/25 16:10
*/
@Slf4j
public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
/**
* defaultTopic
*/
private String defaultTopic;
/**
* rabbmit设置
*/
private TbRabbitMqSettings rabbitMqSettings;
/**
* 线程执行器
*/
private ListeningExecutorService producerExecutor;
/**
* 通道
*/
private Channel channel;
/**
* 连接器
*/
private Connection connection;
/**
* topic组
*/
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
public TbRabbitMqProducerTemplate(TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
this.defaultTopic = defaultTopic;
this.rabbitMqSettings = rabbitMqSettings;
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
log.error("rabbmit创建连接失败!", e);
}
}
@Override
public void init() {
}
@Override
public String getDefaultTopic() {
return defaultTopic;
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(rabbitMqSettings.getMessageTtl()).build();
try {
if (!topicIfNotExist) {
//topic不存在创建通道队列
log.info("sendQueueDeclareAdd==>exchangeName:{} topic:{}",tpi.getExchangeName(),tpi.getTopic());
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Integer.parseInt(rabbitMqSettings.getMessageTtl()));
channel.queueDeclare(tpi.getTopic(), true, false, false, args);
}
if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
channel = connection.createChannel();
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT, true, false, null);
}
//channel.queueBind(tpi.getTopic(), tpi.getExchangeName(), tpi.getTopic());
channel.basicPublish(tpi.getExchangeName(), tpi.getTopic(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(new RabbitQueueMsgMetadata(msg.getKey()));
}
} catch (IOException e) {
log.error("Failed publish message: [{}].", msg, e);
if (callback != null) {
callback.onFailure(e);
}
}
}
@Override
public void stop() {
if (producerExecutor != null) {
producerExecutor.shutdownNow();
}
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
@Override
public void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi);
try {
if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
channel = connection.createChannel();
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT, true, false, null);
}
if (!topicIfNotExist) {
//topic不存在创建通道队列
log.info("queueDeclareAdd==>exchangeName:{} topic:{}",tpi.getExchangeName(),tpi.getTopic());
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Integer.parseInt(rabbitMqSettings.getMessageTtl()));
channel.queueDeclare(tpi.getTopic(), true, false, false, args);
channel.queueBind(tpi.getTopic(), tpi.getExchangeName(), tpi.getTopic());
}
// callback.onSuccess(new RabbitQueueMsgMetadata());
} catch (IOException e) {
log.error("Failed publish message: {}.", e);
if (callback != null) {
callback.onFailure(e);
}
}
}
@Override
public void queueDel(String queue, TbQueueCallback callback) {
try {
channel.queueDelete(queue);
//删除topic
TopicPartitionInfo topicPartitionInfo = this.topics.stream().filter(f -> f.getTopic().equals(queue)).findFirst().orElseGet(() -> null);
if (!ObjectUtils.isEmpty(topicPartitionInfo)) {
this.topics.remove(topicPartitionInfo);
}
} catch (IOException e) {
log.error("Failed publish message: {}.", e);
}
}
private Boolean createTopicIfNotExist(TopicPartitionInfo tpi) {
//判断绑定的交换机与队列是否相同,如果不同返回false
TopicPartitionInfo topicPartitionInfo = topics.stream()
.filter(f -> f.getTopic().equals(tpi.getTopic())
&& f.getExchangeName().equals(tpi.getExchangeName()))
.findAny().orElseGet(() -> null);
if (ObjectUtils.isEmpty(topicPartitionInfo)) {
//exchange queue都不同
topics.add(tpi);
return false;
}else{
//queue相同 exchange不同
TopicPartitionInfo temp = topics.stream()
.filter(f -> f.getTopic().equals(tpi.getTopic()))
.findAny().orElseGet(() -> null);
if(!ObjectUtils.isEmpty(temp)){
//queue相同 exchange不同 需重新绑定
topics.remove(temp);
topics.add(tpi);
return false;
}else{
return true;
}
}
//
// if (topics.contains(tpi)) {
// //topic相同,exchange可能不同
// TopicPartitionInfo topicPartitionInfo = topics.stream().filter(f -> f.getTopic().equals(tpi.getTopic()) && f.getExchangeName().equals(tpi.getExchangeName())).findAny().orElseGet(() -> null);
// if (ObjectUtils.isEmpty(topicPartitionInfo)) {
// //exchange不同
// topics.add(tpi);
// //topics.remove()
// return true;
// }
//
// log.debug("contains topc:{}", tpi.getTopic());
// return true;
// }
// topics.add(tpi);
// return false;
}
private boolean innerExists(String exchangeName, Channel outerChannel) {
boolean result = true;
try {
outerChannel.exchangeDeclarePassive(exchangeName);
} catch (IOException e) {
result = false;
}
return result;
}
}
package com.mortals.xhx.queue.rabbitmq;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
//@Component
//@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class TbRabbitMqQueueArguments {
@Value("${queue.rabbitmq.queue-properties.core}")
private String coreProperties;
@Value("${queue.rabbitmq.queue-properties.rule-engine}")
private String ruleEngineProperties;
@Value("${queue.rabbitmq.queue-properties.transport-api}")
private String transportApiProperties;
@Value("${queue.rabbitmq.queue-properties.notifications}")
private String notificationsProperties;
@Value("${queue.rabbitmq.queue-properties.js-executor}")
private String jsExecutorProperties;
@Getter
private Map<String, Object> coreArgs;
@Getter
private Map<String, Object> ruleEngineArgs;
@Getter
private Map<String, Object> transportApiArgs;
@Getter
private Map<String, Object> notificationsArgs;
@Getter
private Map<String, Object> jsExecutorArgs;
@PostConstruct
private void init() {
coreArgs = getArgs(coreProperties);
ruleEngineArgs = getArgs(ruleEngineProperties);
transportApiArgs = getArgs(transportApiProperties);
notificationsArgs = getArgs(notificationsProperties);
jsExecutorArgs = getArgs(jsExecutorProperties);
}
private Map<String, Object> getArgs(String properties) {
Map<String, Object> configs = new HashMap<>();
for (String property : properties.split(";")) {
int delimiterPosition = property.indexOf(":");
String key = property.substring(0, delimiterPosition);
String strValue = property.substring(delimiterPosition + 1);
configs.put(key, getObjectValue(strValue));
}
return configs;
}
private Object getObjectValue(String str) {
if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) {
return Boolean.valueOf(str);
} else if (isNumeric(str)) {
return getNumericValue(str);
}
return str;
}
private Object getNumericValue(String str) {
if (str.contains(".")) {
return Double.valueOf(str);
} else {
return Long.valueOf(str);
}
}
private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?");
public boolean isNumeric(String strNum) {
if (strNum == null) {
return false;
}
return PATTERN.matcher(strNum).matches();
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import javax.annotation.PostConstruct;
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
//@Component
@Data
public class TbRabbitMqSettings {
@Value("${queue.rabbitmq.exchange_name:}")
private String exchangeName;
@Value("${queue.rabbitmq.host:}")
private String host;
@Value("${queue.rabbitmq.port:0}")
private int port;
@Value("${queue.rabbitmq.virtual_host:}")
private String virtualHost;
@Value("${queue.rabbitmq.username:}")
private String username;
@Value("${queue.rabbitmq.password:}")
private String password;
@Value("${queue.rabbitmq.automatic_recovery_enabled:}")
private boolean automaticRecoveryEnabled;
@Value("${queue.rabbitmq.connection_timeout:}")
private int connectionTimeout;
@Value("${queue.rabbitmq.handshake_timeout:}")
private int handshakeTimeout;
@Value("${queue.rabbitmq.queue-properties.x-message-ttl:86400000}")
@Getter
private String messageTtl;
private ConnectionFactory connectionFactory;
public void setVHost(String virtualHost) {
connectionFactory.setVirtualHost(virtualHost);
}
@PostConstruct
private void init() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
connectionFactory.setConnectionTimeout(connectionTimeout);
connectionFactory.setHandshakeTimeout(handshakeTimeout);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment