Commit 964f0d19 authored by 赵啸非's avatar 赵啸非

业务日志添加

parent 694048e3
......@@ -32,10 +32,10 @@
<profiles.redis.username></profiles.redis.username>
<profiles.redis.password></profiles.redis.password>
<profiles.redis.database>8</profiles.redis.database>
<profiles.rabbitmq.host>127.0.0.1</profiles.rabbitmq.host>
<profiles.rabbitmq.host>192.168.0.98</profiles.rabbitmq.host>
<profiles.rabbitmq.port>5672</profiles.rabbitmq.port>
<profiles.rabbitmq.username>guest</profiles.rabbitmq.username>
<profiles.rabbitmq.password>guest</profiles.rabbitmq.password>
<profiles.rabbitmq.username>taxi_mq</profiles.rabbitmq.username>
<profiles.rabbitmq.password>admin@2020</profiles.rabbitmq.password>
<profiles.rabbitmq.virtualhost>/</profiles.rabbitmq.virtualhost>
<profiles.rabbitmq.exchange></profiles.rabbitmq.exchange>
<profiles.filepath>/mortals/data</profiles.filepath>
......
package com.mortals.xhx.base.framework.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mortals.xhx.common.key.QueueKey;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import com.mortals.xhx.module.access.model.AccessLogEntity;
import com.mortals.xhx.module.biz.model.BizLogEntity;
import com.mortals.xhx.module.error.model.ErrorLogEntity;
import com.mortals.xhx.module.operate.model.OperateLogEntity;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.*;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
......@@ -90,18 +94,16 @@ public class RabbitConfig {
return BindingBuilder.bind(operationLogQueue()).to(exchange()).with(QueueKey.OPERATION_LOG_QUEUE);
}
@Bean(name = "consumerBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setContentType("application/json");
// m.getMessageProperties().setHeader("__TypeId__","java.lang.String");
return m;
});
configurer.configure(factory, connectionFactory);
// 额外添加批量消费的属性
factory.setBatchListener(true);
......@@ -116,12 +118,6 @@ public class RabbitConfig {
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
configurer.configure(factory, connectionFactory);
return factory;
}
......@@ -129,7 +125,45 @@ public class RabbitConfig {
//修改系列和与反序列化转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
//return null;
ObjectMapper objectMapper = new ObjectMapper();
//设置转换的实体类可见性[避免属性访问权问题所导致的缺少字段]
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//因为实体类的包名不同的问题;重写 classMapper映射,不然接收消息异常报错
jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> aClass, MessageProperties messageProperties) {
//反序列化指定类路径
if (messageProperties.getConsumerQueue().equalsIgnoreCase(QueueKey.ACCESS_LOG_QUEUE)) {
// messageProperties.setHeader("__TypeId__", "com.mortals.xhx.module.access.model.AccessLogEntity");
}else if(messageProperties.getConsumerQueue().equalsIgnoreCase(QueueKey.BIZ_LOG_QUEUE)){
messageProperties.setHeader("__TypeId__", "com.mortals.xhx.module.biz.model.BizLogEntity");
}else if(messageProperties.getConsumerQueue().equalsIgnoreCase(QueueKey.ERROR_LOG_QUEUE)){
messageProperties.setHeader("__TypeId__", "com.mortals.xhx.module.error.model.ErrorLogEntity");
}else if(messageProperties.getConsumerQueue().equalsIgnoreCase(QueueKey.OPERATION_LOG_QUEUE)){
messageProperties.setHeader("__TypeId__", "com.mortals.xhx.module.operate.model.OperateLogEntity");
}
}
@Override
public Class<?> toClass(MessageProperties properties) {
if (properties.getConsumerQueue().equalsIgnoreCase(QueueKey.ACCESS_LOG_QUEUE)) {
return String.class;
//return AccessLogEntity.class;
}else if (properties.getConsumerQueue().equalsIgnoreCase(QueueKey.BIZ_LOG_QUEUE)) {
return BizLogEntity.class;
}else if (properties.getConsumerQueue().equalsIgnoreCase(QueueKey.ERROR_LOG_QUEUE)) {
return ErrorLogEntity.class;
}else if (properties.getConsumerQueue().equalsIgnoreCase(QueueKey.OPERATION_LOG_QUEUE)) {
return OperateLogEntity.class;
}
return String.class;
}
});
return jackson2JsonMessageConverter;
}
}
......@@ -19,8 +19,8 @@ import org.springframework.context.annotation.Configuration;
* @author: zxfei
* @date: 2022/8/8 16:02
*/
//@Configuration
//@ConfigurationProperties(prefix = "spring.redis")
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Slf4j
@Data
public class RedissonConfig {
......@@ -38,6 +38,7 @@ public class RedissonConfig {
RedissonClient redissonClient;
Config config = new Config();
String url = "redis://" + host + ":" + port;
if("".equalsIgnoreCase(password)) password=null;
// 单节点配置
config.useSingleServer().setAddress(url).setDatabase(database).setPassword(password);
//使用json序列化方式
......
......@@ -24,6 +24,7 @@ import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
......@@ -39,21 +40,30 @@ import java.util.stream.Collectors;
@Component
@RabbitListener(queues = QueueKey.ACCESS_LOG_QUEUE,
containerFactory = "consumerBatchContainerFactory")
//@RabbitListener(queues = QueueKey.ACCESS_LOG_QUEUE)
public class AccessMessageConsumerListener {
@Autowired
private AccessLogService accessLogService;
//@RabbitHandler
public void onMessage(AccessLogEntity entity) throws Exception {
log.info("[Access onMessage single]");
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
accessLogService.save(entity,null);
}
/* @RabbitHandler
public void onMessage(List<AccessLogEntity> messages) {
log.info("[Access onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName() + Thread.currentThread().getId(), messages.size());
List<AccessLogEntity> collect = messages.stream().map(entity -> {
try {
// System.out.println(new String(message, Charset.defaultCharset()));
// AccessLogEntity entity = null;
// entity = JSON.parseObject(new String(message, Charset.defaultCharset()), AccessLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
return entity;
} catch (Exception e) {
log.info("反序列化异常", e);
return null;
}
}).filter(f -> f != null).collect(Collectors.toList());
accessLogService.save(collect);
}*/
@RabbitHandler
public void onMessage(List<String> messages) throws Exception {
......@@ -66,7 +76,6 @@ public class AccessMessageConsumerListener {
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
//String json = JSONUtil.formatJsonStr(new String(message, StandardCharsets.UTF_8));
return entity;
} catch (Exception e) {
log.info("反序列化异常", e);
......@@ -84,13 +93,13 @@ public class AccessMessageConsumerListener {
ArrayList<?> messages = new ArrayList<>();
// messages.add("111");
// messages.add("111");
if ( messages.stream().noneMatch((o -> !(o instanceof String)))) {
if (messages.stream().noneMatch((o -> !(o instanceof String)))) {
log.info("都匹配");
}else{
} else {
log.info("存在不匹配");
}
......
......@@ -13,6 +13,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
......@@ -34,10 +35,10 @@ public class BizMessageConsumerListener {
private BizLogService bizLogService;
@RabbitHandler
public void onMessage(List<String> messages) {
public void onMessage(List<BizLogEntity> messages) {
log.info("[Biz onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName()+Thread.currentThread().getId(), messages.size());
List<BizLogEntity> collect = messages.stream().map(str -> {
BizLogEntity entity = JSON.parseObject(str, BizLogEntity.class);
List<BizLogEntity> collect = messages.stream().map(entity -> {
// BizLogEntity entity = JSON.parseObject(new String(str, Charset.defaultCharset()), BizLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
......
......@@ -18,6 +18,7 @@ import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.List;
import java.util.Map;
......@@ -40,10 +41,10 @@ public class ErrorMessageConsumerListener {
private ErrorLogService errorLogService;
@RabbitHandler
public void onMessage(List<String> messages) throws Exception {
public void onMessage(List<ErrorLogEntity> messages) {
log.info("[Error onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName()+Thread.currentThread().getId(), messages.size());
messages.stream().peek(str -> {
ErrorLogEntity entity = JSON.parseObject(str, ErrorLogEntity.class);
messages.stream().peek(entity -> {
//ErrorLogEntity entity = JSON.parseObject(new String(str, Charset.defaultCharset()), ErrorLogEntity.class);
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
if (!ObjectUtils.isEmpty(entity.getFingerprint())) {
......
......@@ -2,6 +2,7 @@ package com.mortals.xhx.base.framework.listener;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.base.framework.ws.message.Message;
import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.module.access.model.AccessLogEntity;
import com.mortals.xhx.module.error.model.ErrorLogEntity;
......@@ -19,10 +20,7 @@ import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
......@@ -33,21 +31,19 @@ import java.util.stream.Collectors;
* @description:
**/
@Slf4j
@Component
@RabbitListener(queues = QueueKey.OPERATION_LOG_QUEUE,containerFactory = "consumerContainerFactory")
//@RabbitListener(queues = QueueKey.OPERATION_LOG_QUEUE,
// containerFactory = "consumerBatchContainerFactory")
//@Component
@RabbitListener(queues = QueueKey.OPERATION_LOG_QUEUE,
containerFactory = "consumerBatchContainerFactory")
public class OperateMessageConsumerListener {
@Autowired
private OperateLogService operateLogService;
/* @RabbitHandler
public void onMessage(@Payload List<String> messages, Channel channel) throws Exception {
/* @RabbitHandler
public void onMessage(List<byte[]> messages) {
log.info("[Oper onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName() + Thread.currentThread().getId(), messages.size());
List<OperateLogEntity> collect = messages.stream().map(str -> {
OperateLogEntity entity = JSON.parseObject(str, OperateLogEntity.class);
OperateLogEntity entity = JSON.parseObject(new String(str), OperateLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
......@@ -57,49 +53,15 @@ public class OperateMessageConsumerListener {
}*/
@RabbitHandler
public void onMessage(@Payload String messages) {
log.info("[operate onMessage][线程编号:{} 消息:{}]", Thread.currentThread().getName() + Thread.currentThread().getId(), messages);
}
@RabbitHandler
public void onMessage(@Payload List<String> messages) {
log.info("[operate onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName() + Thread.currentThread().getId(), messages.size());
ArrayList<OperateLogEntity> list = new ArrayList<>();
for (String message : messages) {
OperateLogEntity entity = null;
try {
log.info("message:{}", message);
entity = JSON.parseObject(message, OperateLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
} catch (Exception e) {
log.info("反序列化异常", e);
}
if (entity != null) {
list.add(entity);
}
}
if(!ObjectUtils.isEmpty(list)){
operateLogService.save(list);
}
/* List<OperateLogEntity> collect = messages.stream().map(message -> {
try {
log.info("message:{}", message);
OperateLogEntity entity = JSON.parseObject(message, OperateLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
return entity;
} catch (Exception e) {
log.info("反序列化异常", e);
return null;
}
}).filter(f -> f != null).collect(Collectors.toList());
log.info("save collect size==>{}", collect.size());
operateLogService.save(collect);*/
public void onMessage(List<OperateLogEntity> messages) {
log.info("[Oper onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getName() + Thread.currentThread().getId(), messages.size());
List<OperateLogEntity> collect = messages.stream().map(entity -> {
//OperateLogEntity entity = JSON.parseObject(new String(str), OperateLogEntity.class);
entity.setId(IdUtil.getSnowflake(0, 1).nextId());
entity.setCreateUserId(1L);
entity.setCreateTime(new Date());
return entity;
}).collect(Collectors.toList());
operateLogService.save(collect);
}
}
......@@ -160,7 +160,8 @@ public class TableIndexServiceImpl extends AbstractCRUDServiceImpl<TableIndexDao
return;
}
Date currDate = new Date();
Set<String> existsTables = this.findLastMonthTables();
//Set<String> existsTables = this.findLastMonthTables();
Set<String> existsTables = this.findThreeMonthTables();
TableParam param = new TableParam();
param.unionTime = currDate;
param.filterTables = existsTables;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment