Commit 01b49a93 authored by 赵啸非's avatar 赵啸非

修改消息组件

parent 3707e4a5
...@@ -27,10 +27,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu ...@@ -27,10 +27,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
@Getter @Getter
private TbQueueConsumer<TbQueueMsg> mainConsumer; private TbQueueConsumer<TbQueueMsg> mainConsumer;
@Getter
private List<TbQueueConsumer<TbQueueMsg>> consumerList;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) { public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) {
this.mainConsumer = tbCoreQueueFactory.createMsgConsumer(); this.mainConsumer = tbCoreQueueFactory.createMsgConsumer();
//Object deviceService = SpringUtil.getBean("deviceService");
//tbCoreQueueFactory.createListMsgConsumer()
} }
@PostConstruct @PostConstruct
...@@ -88,4 +92,35 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu ...@@ -88,4 +92,35 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
} }
@Override
protected void launchConsumersList() {
log.info("启动消费线程组!");
consumerList.stream().forEach(consumer -> {
log.info("channel number:{}"+consumer.getChannelNumber());
consumersExecutor.submit(() -> {
while (!stopped) {
try {
List<TbQueueMsg> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item : msgs) {
comsureQueue.offer(item);
}
consumer.commit();
} catch (Exception e) {
if (!stopped) {
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
});
});
}
} }
...@@ -44,4 +44,6 @@ public interface TbQueueConsumer<T extends TbQueueMsg> { ...@@ -44,4 +44,6 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
*/ */
void commit(); void commit();
String getChannelNumber();
} }
...@@ -168,4 +168,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -168,4 +168,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected void doUnsubscribe(); abstract protected void doUnsubscribe();
} }
...@@ -99,4 +99,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue ...@@ -99,4 +99,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
public static void main(String[] args) { public static void main(String[] args) {
// TbKafkaConsumerTemplate.builder(). // TbKafkaConsumerTemplate.builder().
} }
@Override
public String getChannelNumber() {
return "1231";
}
} }
...@@ -14,15 +14,6 @@ import java.util.concurrent.Executors; ...@@ -14,15 +14,6 @@ import java.util.concurrent.Executors;
@Slf4j @Slf4j
public abstract class AbstractConsumerService<N extends TbQueueMsg> { public abstract class AbstractConsumerService<N extends TbQueueMsg> {
protected volatile ExecutorService consumersExecutor;
protected volatile boolean stopped = false;
public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
launchMainConsumers();
}
/** /**
* 启动消费主线程服务 * 启动消费主线程服务
*/ */
...@@ -31,6 +22,22 @@ public abstract class AbstractConsumerService<N extends TbQueueMsg> { ...@@ -31,6 +22,22 @@ public abstract class AbstractConsumerService<N extends TbQueueMsg> {
* 停止消费主线程服务 * 停止消费主线程服务
*/ */
protected abstract void stopMainConsumers(); protected abstract void stopMainConsumers();
/**
* 启动一组消费线程
*/
protected abstract void launchConsumersList();
protected volatile ExecutorService consumersExecutor;
protected volatile boolean stopped = false;
public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
launchMainConsumers();
}
@PreDestroy @PreDestroy
......
...@@ -10,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -10,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
/** /**
* kafka 消息工厂类 * kafka 消息工厂类
* *
...@@ -46,4 +48,9 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { ...@@ -46,4 +48,9 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
comsumerBuilder.settings(kafkaSettings); comsumerBuilder.settings(kafkaSettings);
return comsumerBuilder.build(); return comsumerBuilder.build();
} }
@Override
public List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts) {
return null;
}
} }
...@@ -8,7 +8,10 @@ import com.mortals.xhx.queue.rabbitmq.TbRabbitMqSettings; ...@@ -8,7 +8,10 @@ import com.mortals.xhx.queue.rabbitmq.TbRabbitMqSettings;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
...@@ -31,6 +34,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { ...@@ -31,6 +34,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override @Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() { public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() { return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override @Override
public String getKey() { public String getKey() {
...@@ -48,4 +52,29 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { ...@@ -48,4 +52,29 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
} }
}); });
} }
@Override
public List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts) {
List<TbQueueConsumer<TbQueueMsg>> list =vhosts.stream().map(vhost->{
rabbitMqSettings.setVHost(vhost);
return new TbRabbitMqConsumerTemplate<TbQueueMsg>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public String getKey() {
return msg.getKey();
}
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
@Override
public byte[] getData() {
return msg.getData();
}
});
}).collect(Collectors.toList());
return list;
}
} }
...@@ -5,6 +5,8 @@ import com.mortals.xhx.queue.TbQueueConsumer; ...@@ -5,6 +5,8 @@ import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg; import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer; import com.mortals.xhx.queue.TbQueueProducer;
import java.util.List;
public interface TbCoreQueueFactory { public interface TbCoreQueueFactory {
/** /**
...@@ -19,4 +21,21 @@ public interface TbCoreQueueFactory { ...@@ -19,4 +21,21 @@ public interface TbCoreQueueFactory {
*/ */
TbQueueConsumer<TbQueueMsg> createMsgConsumer(); TbQueueConsumer<TbQueueMsg> createMsgConsumer();
/* *//**
* 消息消费服务
* @return
*//*
TbQueueConsumer<TbQueueMsg> createMsgConsumer();*/
/**
*
* @return
*/
List<TbQueueConsumer<TbQueueMsg>> createListMsgConsumer(List<String> vhosts);
} }
...@@ -11,6 +11,7 @@ import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate; ...@@ -11,6 +11,7 @@ import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.GetResponse;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
...@@ -18,16 +19,20 @@ import java.io.IOException; ...@@ -18,16 +19,20 @@ import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* 消费连接池
*
* @author: zxfei
* @date: 2022/4/25 13:49
*/
@Slf4j @Slf4j
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> { public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
private final TbQueueMsgDecoder<T> decoder; private final TbQueueMsgDecoder<T> decoder;
@Getter
private Channel channel; private Channel channel;
private Connection connection; private Connection connection;
private final Gson gson = new Gson();
private volatile Set<String> queues; private volatile Set<String> queues;
public TbRabbitMqConsumerTemplate(TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) { public TbRabbitMqConsumerTemplate(TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
...@@ -40,7 +45,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -40,7 +45,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
} catch (IOException | TimeoutException e) { } catch (IOException | TimeoutException e) {
log.error("Failed to create connection.", e); log.error("Failed to create connection.", e);
} }
stopped = false; stopped = false;
} }
...@@ -99,40 +103,27 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -99,40 +103,27 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
} }
} }
public T decode(GetResponse message) { @Override
public String getChannelNumber() {
return channel.getChannelNumber()+"";
}
// log.info("getRespBody:" + new String(message.getBody())); public T decode(GetResponse message) {
try { try {
DefaultTbQueueMsg msg = new DefaultTbQueueMsg(); DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
// DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); // DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class); Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class);
msg.setKey((String)map.get("key")); msg.setKey((String)map.get("key"));
String payloadStr = (String)map.get("data"); String payloadStr = (String)map.get("data");
System.out.println("receivedPayLoadStr:" + payloadStr);
byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr); byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr);
msg.setData(payloadDecodeByte); msg.setData(payloadDecodeByte);
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
String headerStr = ((JSONObject) map.get("headers")).getString("data"); String headerStr = ((JSONObject) map.get("headers")).getString("data");
HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class); HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class);
headers.setData(hashMap); headers.setData(hashMap);
// String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
// String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
// String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
// if(!ObjectUtils.isEmpty(clientIdStr)){
// headers.put(MessageHeader.CLIENTID, clientIdStr);
// }
// if(!ObjectUtils.isEmpty(qosStr)){
// headers.put(MessageHeader.TOPIC,qosStr);
// }
// if(!ObjectUtils.isEmpty(timestampStr)) {
// headers.put(MessageHeader.TIMESTAMP, timestampStr);
// }
msg.setHeaders(headers); msg.setHeaders(headers);
// log.info("msg:" + msg.toString()); log.debug("msg:" + msg.toString());
return decoder.decode(msg); return decoder.decode(msg);
} catch (Exception e) { } catch (Exception e) {
log.error("反序列化异常!", e); log.error("反序列化异常!", e);
......
...@@ -36,6 +36,10 @@ public class TbRabbitMqSettings { ...@@ -36,6 +36,10 @@ public class TbRabbitMqSettings {
private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactory;
public void setVHost(String virtualHost) {
connectionFactory.setVirtualHost(virtualHost);
}
@PostConstruct @PostConstruct
private void init() { private void init() {
connectionFactory = new ConnectionFactory(); connectionFactory = new ConnectionFactory();
...@@ -47,5 +51,6 @@ public class TbRabbitMqSettings { ...@@ -47,5 +51,6 @@ public class TbRabbitMqSettings {
connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled); connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
connectionFactory.setConnectionTimeout(connectionTimeout); connectionFactory.setConnectionTimeout(connectionTimeout);
connectionFactory.setHandshakeTimeout(handshakeTimeout); connectionFactory.setHandshakeTimeout(handshakeTimeout);
} }
} }
...@@ -62,7 +62,11 @@ ...@@ -62,7 +62,11 @@
<groupId>io.jsonwebtoken</groupId> <groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId> <artifactId>jjwt</artifactId>
</dependency> </dependency>
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.1</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
......
...@@ -11,10 +11,6 @@ import org.springframework.context.annotation.Configuration; ...@@ -11,10 +11,6 @@ import org.springframework.context.annotation.Configuration;
**/ **/
@Configuration @Configuration
public class AccountConfig { public class AccountConfig {
// @Bean
// public Contract feignContract() {
// return new HierarchicalContract();
// }
@Bean @Bean
public BaseWebMvcConfigurer getBaseWebMvc(){ public BaseWebMvcConfigurer getBaseWebMvc(){
......
package com.mortals.xhx.base.framework.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @author: zxfei
* @date: 2022/2/15 13:16
* @description:
**/
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowCredentials(true)
.allowedOrigins("*")
.allowedMethods(new String[] { "GET", "POST","PUT","DELETE"})
.allowedHeaders("*")
.exposedHeaders("*");
}
}
package com.mortals.xhx.base.framework.config;
import com.mortals.framework.filter.RepeatableFilter;
import com.mortals.framework.filter.XssFilter;
import com.mortals.framework.util.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import javax.servlet.DispatcherType;
import java.util.HashMap;
import java.util.Map;
/**
* Filter配置
*
* @author zxfei
*/
//@Configuration
public class FilterConfig {
@Value("${xss.enabled}")
private String enabled;
@Value("${xss.excludes}")
private String excludes;
@Value("${xss.urlPatterns}")
private String urlPatterns;
@SuppressWarnings({"rawtypes", "unchecked"})
@Bean
public FilterRegistrationBean xssFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setDispatcherTypes(DispatcherType.REQUEST);
registration.setFilter(new XssFilter());
registration.addUrlPatterns(StringUtils.split(urlPatterns, ","));
registration.setName("xssFilter");
registration.setOrder(FilterRegistrationBean.HIGHEST_PRECEDENCE);
Map<String, String> initParameters = new HashMap<String, String>();
initParameters.put("excludes", excludes);
initParameters.put("enabled", enabled);
registration.setInitParameters(initParameters);
return registration;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Bean
public FilterRegistrationBean someFilterRegistration() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setFilter(new RepeatableFilter());
registration.addUrlPatterns("/*");
registration.setName("repeatableFilter");
registration.setOrder(FilterRegistrationBean.LOWEST_PRECEDENCE);
return registration;
}
}
package com.mortals.xhx.base.framework.config;
import com.mortals.framework.util.DateUtils;
import com.p6spy.engine.spy.appender.MessageFormattingStrategy;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.commons.lang3.StringUtils;
/**
* 自定义 p6spy sql输出格式
*
* @author: zxfei
* @date: 2022/4/19 9:01
*/
@CommonsLog
public class P6spySqlFormatConfig implements MessageFormattingStrategy {
@Override
public String formatMessage(int connectionId, String now, long elapsed, String category, String prepared, String sql, String url) {
return StringUtils.isNotBlank(sql) ? DateUtils.getCurrStrDateTime()
+ " | 耗时 " + elapsed + " ms | SQL:" + StringUtils.LF + sql.replaceAll("[\\s]+", StringUtils.SPACE) + ";" : "";
}
}
package com.mortals.xhx.base.framework.filter;
import cn.hutool.core.util.IdUtil;
import com.mortals.framework.service.IAuthTokenService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.util.ContentCachingRequestWrapper;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
*
* 请求过滤链
* @author: zxfei
* @date: 2022/4/20 14:52
*/
@Component
@Slf4j
public class RequestFilter extends OncePerRequestFilter implements Filter {
@Autowired
private IAuthTokenService authTokenService;
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
try {
//每个请求记录一个traceId,可以根据traceId搜索出本次请求的全部相关日志
MDC.put("traceId", IdUtil.fastSimpleUUID().substring(0,12));
setUsername(request);
request = new ContentCachingRequestWrapper(request);
filterChain.doFilter(request, response);
} catch (Exception e) {
throw e;
} finally {
//清理ThreadLocal
MDC.clear();
}
}
private void setUsername(HttpServletRequest request) {
//通过token解析出username
String token = authTokenService.getToken(request);
//String token = request.getHeader("token");
if (!ObjectUtils.isEmpty(token)) {
MDC.put("token",token);
// MDC.put("token", token);
// try {
// SessionUserInfo info = tokenService.getUserInfo();
// if (info != null) {
// String username = info.getUsername();
// MDC.put("username", username);
// }
// } catch (CommonJsonException e) {
// log.info("无效的token:{}", token);
// }
}
}
}
package com.mortals.xhx.busiz.rsp; package com.mortals.xhx.busiz.rsp;
import com.mortals.xhx.common.code.YesNoEnum;
import lombok.Data; import lombok.Data;
/** /**
......
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
import java.io.Serializable;
@Data
public class DeviceQueueAuthInfo implements Serializable {
/**
* 地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 虚拟机名称
*/
private String virtualHost;
/**
* 交换机名称
*/
private String exchangeName;
/**
* 设备上行topic队列
*/
private String uploadTopicFilter;
/**
* 设备下行topic队列
*/
private String downTopicFilter;
}
...@@ -8,12 +8,10 @@ import java.io.Serializable; ...@@ -8,12 +8,10 @@ import java.io.Serializable;
@Data @Data
public class DeviceResp implements Serializable { public class DeviceResp implements Serializable {
/** /**
* token * token(AES加密)
*/ */
private String token; private String token;
} }
package com.mortals.xhx.busiz.web; package com.mortals.xhx.busiz.web;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.util.AESUtil;
import com.mortals.xhx.base.DeviceAuth; import com.mortals.xhx.base.DeviceAuth;
import com.mortals.xhx.base.framework.ws.message.SendToAllRequest; import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
import com.mortals.xhx.base.framework.ws.util.WebSocketUtil; import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
import com.mortals.xhx.busiz.req.DeviceReq; import com.mortals.xhx.busiz.req.DeviceReq;
import com.mortals.xhx.busiz.req.UploadDeviceReq; import com.mortals.xhx.busiz.req.UploadDeviceReq;
import com.mortals.xhx.busiz.rsp.ApiResp; import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.busiz.rsp.DeviceQueueAuthInfo;
import com.mortals.xhx.busiz.rsp.DeviceResp; import com.mortals.xhx.busiz.rsp.DeviceResp;
import com.mortals.xhx.busiz.security.DeviceTokenService; import com.mortals.xhx.busiz.security.DeviceTokenService;
import com.mortals.xhx.common.code.ActiveEnum; import com.mortals.xhx.common.code.ActiveEnum;
...@@ -22,10 +23,15 @@ import com.mortals.xhx.common.model.MessageHeader; ...@@ -22,10 +23,15 @@ import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceEntity; import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceLogService; import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService; import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.model.ProductEntity;
import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.queue.TbQueueMsgHeaders; import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo; import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.extern.apachecommons.CommonsLog; import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
...@@ -45,13 +51,24 @@ import java.util.Date; ...@@ -45,13 +51,24 @@ import java.util.Date;
@RequestMapping("/api") @RequestMapping("/api")
public class DeviceApiController { public class DeviceApiController {
@Autowired
private DeviceLogService deviceLogService;
@Autowired @Autowired
private DeviceService deviceService; private DeviceService deviceService;
@Autowired @Autowired
private DeviceTokenService deviceTokenService; private ProductService productService;
@Autowired
private PlatformService platformService;
@Value("${queue.rabbitmq.virtual_host:}")
private String virtualHost;
@Value("${tok.rabbitmq.password:}")
private String password;
@Value("${queue.rabbitmq.host:}")
private String host;
@Value("${queue.rabbitmq.port:}")
private int port;
@Value("${queue.rabbitmq.username:}")
private String username;
@Value("${token.secret}")
private String secret;
/** /**
...@@ -69,34 +86,48 @@ public class DeviceApiController { ...@@ -69,34 +86,48 @@ public class DeviceApiController {
try { try {
//根据设备编码查询设备是否存在,如果存在判断是否已经激活 //根据设备编码查询设备是否存在,如果存在判断是否已经激活
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum()); DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) { if (ObjectUtils.isEmpty(deviceEntity)) {
deviceEntity = new DeviceEntity();
deviceEntity.initAttrValue();
deviceEntity.setDeviceMac(req.getDeviceMac());
deviceEntity.setDeviceCode(req.getDeviceNum());
deviceEntity.setCreateUserId(1L);
deviceEntity.setCreateTime(new Date());
deviceService.save(deviceEntity);
}
if (deviceEntity.getActive() != ActiveEnum.已激活.getValue()) {
rsp.setCode(ApiRespCodeEnum.FAILED.getValue());
rsp.setMsg("当前设备未激活,请在后台配置后再激活!");
} else {
//判断设备是否已经激活,如果已激活返回token信息 //判断设备是否已经激活,如果已激活返回token信息
if (deviceEntity.getActive() == ActiveEnum.已激活.getValue() && deviceEntity.getStatus() == StatusEnum.启用.getValue()) { if (deviceEntity.getActive() == ActiveEnum.已激活.getValue() && deviceEntity.getStatus() == StatusEnum.启用.getValue()) {
String token = deviceTokenService.createToken(deviceEntity); DeviceQueueAuthInfo authInfo = new DeviceQueueAuthInfo();
authInfo.setHost(host);
authInfo.setPort(port);
authInfo.setUsername(username);
authInfo.setPassword(password);
authInfo.setVirtualHost(virtualHost);
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
if (ObjectUtils.isEmpty(platformEntity)) {
throw new AppException("当前设备未配置所属系统平台,请在后台配置后再激活!");
}
// authInfo.setHost(platformEntity.getPlatformSn());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (ObjectUtils.isEmpty(productEntity)) {
throw new AppException("当前设备未配置所属产品,请在后台配置后再激活!");
}
authInfo.setExchangeName(platformEntity.getPlatformSn()+Constant.EXCHANGE_SPLIT+productEntity.getProductCode());
authInfo.setUploadTopicFilter(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceCode());
authInfo.setDownTopicFilter(Constant.DOWN_TOPIC + deviceEntity.getDeviceCode());
String token = AESUtil.encryptForApp(JSON.toJSONString(authInfo), secret);
deviceResp.setToken(token); deviceResp.setToken(token);
} }
deviceEntity.setOnlineTime(new Date()); deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue()); deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue());
deviceEntity.setIp(req.getIp()); deviceEntity.setIp(req.getIp());
deviceEntity.setPort(req.getPort()); deviceEntity.setPort(req.getPort());
//deviceEntity.setSiteNum(req.getSitenum());
deviceEntity.setCenternum(req.getCenternum()); deviceEntity.setCenternum(req.getCenternum());
deviceService.update(deviceEntity); deviceService.update(deviceEntity);
} else {
//新增设备
deviceEntity = new DeviceEntity();
deviceEntity.initAttrValue();
deviceEntity.setDeviceCode(req.getDeviceNum());
deviceEntity.setDeviceMac(req.getDeviceMac());
deviceEntity.setIp(req.getIp());
deviceEntity.setPort(req.getPort());
//deviceEntity.set(req.getSitenum());
deviceEntity.setCenternum(req.getCenternum());
deviceEntity.setCreateUserId(1L);
deviceEntity.setCreateTime(new Date());
deviceService.save(deviceEntity);
rsp.setMsg("当前设备未激活,激活后再上线。");
WebSocketUtil.broadcast(SendToAllRequest.TYPE, new SendToAllRequest().setContent(JSON.toJSONString(deviceEntity)));
} }
rsp.setData(deviceResp); rsp.setData(deviceResp);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -66,6 +66,10 @@ public final class Constant { ...@@ -66,6 +66,10 @@ public final class Constant {
*/ */
public static final String MESSAGETYPE_HEARTBEAT = "HEART_BEAT"; public static final String MESSAGETYPE_HEARTBEAT = "HEART_BEAT";
/**
* rabbmit exchange分隔符
*/
public static final String EXCHANGE_SPLIT = ".";
/** /**
* 消息类型(激活) * 消息类型(激活)
......
...@@ -31,9 +31,13 @@ spring: ...@@ -31,9 +31,13 @@ spring:
min-idle: 0 min-idle: 0
max-active: 100 max-active: 100
max-wait: 1000 max-wait: 1000
dao:
exceptiontranslation:
enabled: false
datasource: datasource:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.p6spy.engine.spy.P6SpyDriver
#driver-class-name: com.mysql.cj.jdbc.Driver
url: @profiles.datasource.uri@ url: @profiles.datasource.uri@
username: @profiles.datasource.username@ username: @profiles.datasource.username@
password: @profiles.datasource.password@ password: @profiles.datasource.password@
...@@ -104,3 +108,5 @@ token: ...@@ -104,3 +108,5 @@ token:
secret: abcd1234 secret: abcd1234
# 令牌有效期(默认60分钟) # 令牌有效期(默认60分钟)
expireTime: 60 expireTime: 60
# 令牌前缀
prefix: Bearer
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
<profiles.server.port>18222</profiles.server.port> <profiles.server.port>18222</profiles.server.port>
<profiles.queue.type>rabbitmq</profiles.queue.type> <profiles.queue.type>rabbitmq</profiles.queue.type>
<profiles.datasource.uri> <profiles.datasource.uri>
<![CDATA[jdbc:mysql://localhost:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri> <![CDATA[jdbc:p6spy:mysql://localhost:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username> <profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>12345678</profiles.datasource.password> <profiles.datasource.password>12345678</profiles.datasource.password>
<profiles.redis.uri>127.0.0.1</profiles.redis.uri> <profiles.redis.uri>127.0.0.1</profiles.redis.uri>
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
<profiles.rabbitmq.port>5672</profiles.rabbitmq.port> <profiles.rabbitmq.port>5672</profiles.rabbitmq.port>
<profiles.rabbitmq.username>taxi_mq</profiles.rabbitmq.username> <profiles.rabbitmq.username>taxi_mq</profiles.rabbitmq.username>
<profiles.rabbitmq.password>admin@2020</profiles.rabbitmq.password> <profiles.rabbitmq.password>admin@2020</profiles.rabbitmq.password>
<profiles.rabbitmq.virtualhost>/</profiles.rabbitmq.virtualhost> <profiles.rabbitmq.virtualhost>/test</profiles.rabbitmq.virtualhost>
<profiles.rabbitmq.exchange></profiles.rabbitmq.exchange> <profiles.rabbitmq.exchange></profiles.rabbitmq.exchange>
<profiles.filepath>/mortals/data</profiles.filepath> <profiles.filepath>/mortals/data</profiles.filepath>
<profiles.log.level>INFO</profiles.log.level> <profiles.log.level>INFO</profiles.log.level>
...@@ -62,7 +62,7 @@ ...@@ -62,7 +62,7 @@
<profiles.server.port>18222</profiles.server.port> <profiles.server.port>18222</profiles.server.port>
<profiles.queue.type>rabbitmq</profiles.queue.type> <profiles.queue.type>rabbitmq</profiles.queue.type>
<profiles.datasource.uri> <profiles.datasource.uri>
<![CDATA[jdbc:mysql://192.168.0.98:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri> <![CDATA[jdbc:p6spy:mysql://192.168.0.98:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username> <profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>nacos@2020</profiles.datasource.password> <profiles.datasource.password>nacos@2020</profiles.datasource.password>
<profiles.redis.uri>192.168.0.252</profiles.redis.uri> <profiles.redis.uri>192.168.0.252</profiles.redis.uri>
...@@ -90,7 +90,7 @@ ...@@ -90,7 +90,7 @@
<profiles.server.port>18222</profiles.server.port> <profiles.server.port>18222</profiles.server.port>
<profiles.queue.type>rabbitmq</profiles.queue.type> <profiles.queue.type>rabbitmq</profiles.queue.type>
<profiles.datasource.uri> <profiles.datasource.uri>
<![CDATA[jdbc:mysql://192.168.0.26:8183/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri> <![CDATA[jdbc:p6spy:mysql://192.168.0.26:8183/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username> <profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>root123</profiles.datasource.password> <profiles.datasource.password>root123</profiles.datasource.password>
<profiles.redis.uri>192.168.0.26</profiles.redis.uri> <profiles.redis.uri>192.168.0.26</profiles.redis.uri>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment