Commit 7e60e6de authored by 赵啸非's avatar 赵啸非

修改文件存储路径

parent 6963add5
......@@ -54,12 +54,13 @@
<version>2.6.0</version>
</dependency>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
......
......@@ -57,5 +57,6 @@ public class TbRabbitMqSettings {
connectionFactory.setConnectionTimeout(connectionTimeout);
connectionFactory.setHandshakeTimeout(handshakeTimeout);
}
}
......@@ -4,6 +4,9 @@ import java.util.Date;
import javax.servlet.http.HttpServletRequest;
import com.mortals.framework.model.OperateLogPdu;
import com.mortals.framework.service.IMessageProduceService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
......@@ -27,18 +30,32 @@ import com.mortals.xhx.base.system.oper.service.OperLogService;
* @date: 2021/11/5 13:28
*/
@Component
public class OperlogAspect extends FileLogServiceImpl implements ILogService {
private final static Logger logger = LoggerFactory.getLogger(OperlogAspect.class);
@Slf4j
public class OperlogAspect extends FileLogServiceImpl implements ILogService {
@Autowired
private OperLogService operLogService;
@Autowired
private IMessageProduceService messageProduceService;
@Override
public void doHandlerLog(String platformMark, Long userId, String userName, String loginName, String requestUrl,
String content, String ip, Date logDate) {
String content, String ip, Date logDate) {
super.doHandlerLog(platformMark, userId, userName, loginName, requestUrl, content, ip, logDate);
operLogService.insertOperLog(ip, requestUrl, userId, userName, loginName, content);
OperateLogPdu operateLogPdu = new OperateLogPdu();
operateLogPdu.initAttrValue();
operateLogPdu.setIp(ip);
operateLogPdu.setRequestUrl(requestUrl);
operateLogPdu.setUserId(userId);
operateLogPdu.setUserName(userName);
operateLogPdu.setLoginName(loginName);
operateLogPdu.setPlatformMark(platformMark);
operateLogPdu.setLogDate(logDate);
operateLogPdu.setContent(content);
operateLogPdu.setOperType(1);
messageProduceService.syncOperSend(operateLogPdu);
}
@Override
......@@ -46,9 +63,11 @@ public class OperlogAspect extends FileLogServiceImpl implements ILogService {
// operLogService.insertOperLog(ip, requestUrl, null, "", loginName,
// content);
this.doHandlerLog(platformMark, null, "", loginName, requestUrl, content, ip, new Date());
}
@Pointcut("execution(public * com.mortals.xhx..*Controller.*(..))")
/*@Pointcut("execution(public * com.mortals.xhx..*Controller.*(..))")
public void accessLog() {
}
......@@ -58,13 +77,13 @@ public class OperlogAspect extends FileLogServiceImpl implements ILogService {
HttpServletRequest request = attributes.getRequest();
// url
logger.info("ip[{}]url[{}]", request.getRemoteAddr(), request.getRequestURL());
log.info("ip[{}]url[{}]", request.getRemoteAddr(), request.getRequestURL());
// 参数第1和第2个参数为HttpServletRequest request, HttpServletResponse
// response
if (joinPoint.getArgs().length > 2) {
logger.info("args={}", joinPoint.getArgs()[2]);
log.info("args={}", joinPoint.getArgs()[2]);
} else {
logger.info("args={}", joinPoint.getArgs());
log.info("args={}", joinPoint.getArgs());
}
}
......@@ -72,7 +91,7 @@ public class OperlogAspect extends FileLogServiceImpl implements ILogService {
@AfterReturning(returning = "object", pointcut = "accessLog()")
public void doAfterReturning(Object object) {
if (null != object) {
logger.info("response={}", object.toString());
log.info("response={}", object.toString());
}
}
}*/
}
......@@ -30,11 +30,11 @@ import java.util.Map;
* @author: zxfei
* @date: 2022/4/20 9:24
*/
@Aspect
@Component
//@Aspect
//@Component
@Slf4j
@Order(1)
@Profile({"default", "develop", "test"})
//@Profile({"default", "develop", "test"})
public class WebLogAspect {
@Pointcut("execution(public * com.mortals..*Controller.*(..))")
public void webLog() {
......
package com.mortals.xhx.base.framework.config;
import com.mortals.xhx.base.framework.listener.SimpleDynamicListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
// @Autowired
// private SimpleDynamicListener simpleDynamicListener;
//
// //@Bean("simpleMessageListenerContainer")
// public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory cachingConnectionFactory) {
// SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// container.setConcurrentConsumers(10);
// container.setMaxConcurrentConsumers(100);
// container.setMessageListener(simpleDynamicListener);//配置队列监听器
// container.start();
// return container;
// }
// @Bean(name = "consumerBatchContainerFactory")
// public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
// SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// // 创建 SimpleRabbitListenerContainerFactory 对象
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// configurer.configure(factory, connectionFactory);
// // 额外添加批量消费的属性
// factory.setBatchListener(true);
// factory.setBatchSize(20);
// factory.setReceiveTimeout(5 * 1000L);
// factory.setConsumerBatchEnabled(true);
// return factory;
// }
//修改系列和与反序列化转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// @Bean
// public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
// AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
// asyncRabbitTemplate.setReceiveTimeout(10000);
// return asyncRabbitTemplate;
// }
}
package com.mortals.xhx.base.framework.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RedissonConfig配置
*
* @author: zxfei
* @date: 2022/8/8 16:02
*/
//@Configuration
//@ConfigurationProperties(prefix = "spring.redis")
@Slf4j
@Data
public class RedissonConfig {
private String host;
private int port;
private String password;
private int database;
@Bean
public RedissonClient redissonClient() {
RedissonClient redissonClient;
Config config = new Config();
String url = "redis://" + host + ":" + port;
// 单节点配置
config.useSingleServer().setAddress(url).setDatabase(database).setPassword(password);
//使用json序列化方式
Codec codec = new JsonJacksonCodec();
config.setCodec(codec);
// 主从配置
/*config.useMasterSlaveServers()
// 设置redis主节点
.setMasterAddress("redis://192.168.1.120:6379")
// 设置redis从节点
.addSlaveAddress("redis://192.168.1.130:6379", "redis://192.168.1.140:6379");*/
// 哨兵部署方式,sentinel是采用Paxos拜占庭协议,一般sentinel至少3个节点
/*config.useSentinelServers()
.setMasterName("my-sentinel-name")
.addSentinelAddress("redis://192.168.1.120:6379")
.addSentinelAddress("redis://192.168.1.130:6379")
.addSentinelAddress("redis://192.168.1.140:6379");*/
// 集群部署方式,cluster方式至少6个节点,3主3从,3主做sharding,3从用来保证主宕机后可以高可用
/*config.useClusterServers()
// 集群状态扫描间隔时间,单位是毫秒
.setScanInterval(2000)
.addNodeAddress("redis://192.168.1.120:6379")
.addNodeAddress("redis://192.168.1.130:6379")
.addNodeAddress("redis://192.168.1.140:6379")
.addNodeAddress("redis://192.168.1.150:6379")
.addNodeAddress("redis://192.168.1.160:6379")
.addNodeAddress("redis://192.168.1.170:6379");*/
// 云托管部署方式,这种方式主要解决redis提供商为云服务的提供商的redis连接,比如亚马逊云、微软云
/*config.useReplicatedServers()
// 主节点变化扫描间隔时间
.setScanInterval(2000)
.addNodeAddress("redis://192.168.1.120:6379")
.addNodeAddress("redis://192.168.1.130:6379")
.addNodeAddress("redis://192.168.1.140:6379");*/
redissonClient = Redisson.create(config);
return redissonClient;
}
}
\ No newline at end of file
......@@ -23,7 +23,7 @@ import java.io.IOException;
* @author: zxfei
* @date: 2022/4/20 14:52
*/
@Component
//@Component
@Slf4j
public class RequestFilter extends OncePerRequestFilter implements Filter {
......
package com.mortals.xhx.base.framework.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;
/**
* @author: zxfei
* @date: 2022/8/23 0:32
* @description:
**/
@Slf4j
//@Service
public class SimpleDynamicListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String queue = message.getMessageProperties().getConsumerQueue();
byte[] body = message.getBody();
log.info("接收到:" + queue + ",消息内容为:" + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info(queue + "队列消息消费成功");
}
}
package com.mortals.xhx.base.system.message;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.model.AccessLogPdu;
import com.mortals.framework.model.BizLogPdu;
import com.mortals.framework.model.ErrorLogPdu;
import com.mortals.framework.model.OperateLogPdu;
import com.mortals.framework.service.IMessageProduceService;
import com.mortals.xhx.common.key.QueueKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
@Slf4j
public class MessageProducer implements IMessageProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void syncAccessSend(AccessLogPdu accessLogPdu) {
//log.info("accessinfo==>{}",JSON.toJSONString(accessLogPdu));
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.ACCESS_LOG_QUEUE, JSON.toJSONString(accessLogPdu));
}
@Override
public void syncBizSend(BizLogPdu bizLogPdu) {
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.BIZ_LOG_QUEUE, JSON.toJSONString(bizLogPdu));
}
@Override
public void syncErrorSend(ErrorLogPdu errorLogPdu) {
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.ERROR_LOG_QUEUE, JSON.toJSONString(errorLogPdu));
}
@Override
public void syncOperSend(OperateLogPdu operLogPdu) {
log.info("operLogInfo==>{}",JSON.toJSONString(operLogPdu));
rabbitTemplate.convertAndSend(QueueKey.EXCHANGE, QueueKey.OPERATION_LOG_QUEUE, JSON.toJSONString(operLogPdu));
}
}
......@@ -183,14 +183,18 @@ public class MessageServiceImpl implements MessageService {
public static void main(String[] args) {
try {
Integer a = 1;
Integer b = 0;
int i = a / b;
throw new Exception("1213");
} catch (Exception e) {
e.printStackTrace();
String stacktrace = ExceptionUtil.stacktraceToString(e.fillInStackTrace());
System.out.println(stacktrace);
JSONObject jsonObject = new JSONObject();
jsonObject.put("stacktrace",stacktrace);
System.out.println(JSON.toJSONString(jsonObject));
}
......
package com.mortals.xhx.common.key;
/**
* rabbit 队列key定义
*/
public class QueueKey {
public static final String ACCESS_LOG_QUEUE = "ACCESS_LOG_QUEUE";
public static final String BIZ_LOG_QUEUE = "BIZ_LOG_QUEUE";
public static final String ERROR_LOG_QUEUE = "ERROR_LOG_QUEUE";
public static final String OPERATION_LOG_QUEUE = "OPERATION_LOG_QUEUE";
public static final String EXCHANGE = "LOG";
public static final String ROUTING_KEY = "LOG_ROUTING_KEY";
}
......@@ -3,7 +3,9 @@ package com.mortals.xhx.daemon.applicationservice;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.model.BizLogPdu;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.service.IMessageProduceService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
......@@ -25,6 +27,7 @@ import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.utils.IotThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
......@@ -55,6 +58,10 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
private SendTaskThreadPool sendTaskThreadPool;
@Autowired
private ICacheService cacheService;
@Value("${spring.application.name:''}")
private String appName;
@Autowired
private IMessageProduceService messageProduceService;
protected volatile ExecutorService consumersExecutor;
......@@ -189,6 +196,21 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity);
//埋点业务事件,消息上报
BizLogPdu bizLogPdu = new BizLogPdu();
bizLogPdu.initAttrValue();
bizLogPdu.setAppName(appName);
bizLogPdu.setTraceID(IdUtil.objectId());
bizLogPdu.setUserCode("system");
bizLogPdu.setDeviceCode(deviceEntity.getDeviceCode());
bizLogPdu.setEventTopic(messageType);
bizLogPdu.setEventTopicName(LogTypeEnum.下发服务.name());
bizLogPdu.setMsg(queueMsg.getData());
bizLogPdu.setLogLevel("INFO");
bizLogPdu.setLogTime(new Date());
messageProduceService.syncBizSend(bizLogPdu);
}
//获取exchange,
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
......
package com.mortals.xhx.daemon.applicationservice;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.common.utils.SendTask;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.model.ProductEntity;
import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.queue.DefaultTbCoreConsumerService;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.utils.IotThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.mortals.xhx.common.model.MessageHeader.DEVICECODE;
import static com.mortals.xhx.common.model.MessageHeader.MESSAGETYPE;
//@Component
@Slf4j
public class RabbitMsgComsumerStartedService implements IApplicationStartedService {
@Autowired
private DeviceService deviceService;
@Autowired
private ProductService productService;
@Autowired
private PlatformService platformService;
@Autowired
@Qualifier("simpleMessageListenerContainer")
private SimpleMessageListenerContainer simpleContainer;
protected Boolean stopped = false;
@Override
public void start() {
log.info("服务端消费消息服务开始..");
String[] queues = deviceService.find(new DeviceEntity())
.stream()
.filter(f -> !ObjectUtils.isEmpty(platformService.get(f.getPlatformId())))
.filter(f -> !ObjectUtils.isEmpty(productService.get(f.getProductId())))
.map(item -> Constant.UPLOAD_TOPIC + item.getDeviceCode())
.distinct()
.toArray(String[]::new);
simpleContainer.addQueueNames(queues);
simpleContainer.start();
simpleContainer.addQueueNames("123");
}
@Override
public void stop() {
log.info("停止服务..");
}
@Override
public int getOrder() {
return 10;
}
}
package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.lang.PatternPool;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlPath;
......@@ -338,6 +339,8 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
*/
@Override
public void deviceStat(Long siteId,Context context) {
//查询当天统计,如果有 则更新统计结果,否则新增
DeviceStatEntity deviceStatEntity = deviceStatService.selectOne(new DeviceStatQuery()
.siteId(siteId)
......@@ -681,6 +684,10 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
System.out.println(add.toString());
}
......
......@@ -64,7 +64,7 @@ public class DeviceStatController extends BaseCRUDJsonBodyMappingController<Devi
* 同步当前站点
*/
@PostMapping(value = "syncDeviceStat")
public Rest<Object> syncDeviceStat(@RequestBody DeviceQuery deviceQuery) {
public Rest<Object> syncDeviceStat(@RequestBody DeviceQuery deviceQuery) throws Exception {
try {
if(ObjectUtils.isEmpty(deviceQuery)||ObjectUtils.isEmpty(deviceQuery.getSiteId())) throw new AppException("请求站点Id不能为空!");
//刷新成功,返回列
......
......@@ -54,6 +54,7 @@ spring:
port: @profiles.rabbitmq.port@
username: @profiles.rabbitmq.username@
password: @profiles.rabbitmq.password@
virtual-host: @profiles.rabbitmq.virtualhost1@
dao:
exceptiontranslation:
enabled: false
......@@ -70,6 +71,8 @@ mybatis:
type-aliases-package: com.mortals.framework.model,com.mortals.xhx.common.**.model,com.mortals.xhx.**.model
mapper-locations: classpath*:sqlmap/**/*.xml
config-location: classpath*:config/mybatis-sqlmap-config.xml
log:
enable: true
application:
auth:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment