Commit 468ff768 authored by 赵啸非's avatar 赵啸非

ccccc

parent 53a350d8
......@@ -37,6 +37,12 @@
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
......@@ -89,6 +95,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
......
......@@ -3,17 +3,16 @@ package com.mortals.xhx.queue;
import com.mortals.xhx.queue.processing.AbstractConsumerService;
import com.mortals.xhx.queue.provider.TbCoreQueueFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@Service
@Slf4j
......@@ -23,11 +22,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
private long pollDuration;
@Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout;
@Getter
private LinkedBlockingQueue<TbQueueMsg> comsureQueue = new LinkedBlockingQueue<>();
private TbQueueConsumer<TbQueueMsg> mainConsumer;
@Getter
private TbQueueConsumer<TbQueueMsg> mainConsumer;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) {
this.mainConsumer = tbCoreQueueFactory.createMsgConsumer();
//Object deviceService = SpringUtil.getBean("deviceService");
}
@PostConstruct
......@@ -42,12 +45,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
super.destroy();
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 2)
public void onApplicationEvent(ApplicationReadyEvent event) {
super.onApplicationEvent(event);
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
}
@Override
protected void launchMainConsumers() {
......@@ -59,18 +60,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item:msgs){
log.info("msg:"+item.toString());
for (TbQueueMsg item : msgs) {
comsureQueue.offer(item);
}
mainConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
// log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
// log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
......@@ -79,18 +80,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueu
});
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
}
private static class PendingMsgHolder {
// @Getter
// @Setter
// private volatile ToCoreMsg toCoreMsg;
}
@Override
protected void stopMainConsumers() {
if (mainConsumer != null) {
......
package com.mortals.xhx.queue;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.json.JSONObject;
import java.util.UUID;
......@@ -14,10 +17,11 @@ import java.util.UUID;
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DefaultTbQueueMsg implements TbQueueMsg {
private final UUID key;
private final byte[] data;
private final TbQueueMsgHeaders headers;
private UUID key;
private byte[] data;
private TbQueueMsgHeaders headers;
public DefaultTbQueueMsg(TbQueueMsg msg) {
this.key = msg.getKey();
......@@ -25,7 +29,18 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
msg.getHeaders().getData().forEach(headers::put);
this.headers = headers;
}
public static void main(String[] args) {
// DefaultTbQueueMsg defaultTbQueueMsg = new DefaultTbQueueMsg(UUID.randomUUID(),"string".getBytes(),null);
//
// String ret = JSON.toJSONString(defaultTbQueueMsg);
// System.out.println("pro:"+ret);
//
// DefaultTbQueueMsg qu = JSON.parseObject(ret, DefaultTbQueueMsg.class);
//
// System.out.println("after:"+qu.toString());
}
......
......@@ -19,7 +19,6 @@ public class TopicPartitionInfo {
this.topic = topic;
this.partition = partition;
String tmp = topic;
if (partition != null) {
tmp += "." + partition;
}
......
......@@ -76,7 +76,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
consumerLock.lock();
try {
//更新订阅的主题
while (!subscribeQueue.isEmpty()) {
subscribed = false;
......
......@@ -19,14 +19,10 @@ public abstract class AbstractConsumerService<N extends TbQueueMsg> {
public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 2)
public void onApplicationEvent(ApplicationReadyEvent event) {
launchMainConsumers();
}
/**
* 启动消费主线程服务
*/
......
......@@ -39,7 +39,6 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
......
......@@ -2,21 +2,19 @@
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgDecoder;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
......@@ -24,8 +22,10 @@ import java.util.stream.Collectors;
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
private final TbQueueMsgDecoder<T> decoder;
private Channel channel;
private Connection connection;
private Channel channel;
private Connection connection;
private final Gson gson = new Gson();
private volatile Set<String> queues;
......@@ -35,9 +35,9 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
log.info("channelNumber:" + channel.getChannelNumber());
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
// throw new RuntimeException("Failed to create connection." , e);
log.error("Failed to create connection.", e);
}
stopped = false;
......@@ -48,11 +48,12 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
List<GetResponse> result = queues.stream()
.map(queue -> {
try {
return channel.basicGet(queue, false);
GetResponse getResponse = channel.basicGet(queue, true);
return getResponse;
} catch (IOException e) {
log.error("Failed to get messages from queue: [{}]" , queue);
return null;
// throw new RuntimeException("Failed to get messages from queue." , e);
// log.error("Failed to get messages from queue: [{}]" , queue);
return null;
// throw new RuntimeException("Failed to get messages from queue." , e);
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (result.size() > 0) {
......@@ -75,7 +76,7 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
try {
channel.basicAck(0, true);
} catch (IOException e) {
log.error("Failed to ack messages." , e);
log.error("Failed to ack messages.", e);
}
}
......@@ -99,8 +100,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
public T decode(GetResponse message) {
DefaultTbQueueMsg msg = JSON.parseObject(new String(message.getBody()), DefaultTbQueueMsg.class);
// log.info("getRespBody:" + new String(message.getBody()));
try {
DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
// DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class);
msg.setKey(UUID.fromString((String)map.get("key")));
String payloadStr = (String)map.get("data");
byte[] payloadByte = payloadStr.getBytes();
System.out.println("receivedPayLoadStr:" + payloadStr);
byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr);
msg.setData(payloadDecodeByte);
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
if(!ObjectUtils.isEmpty(clientIdStr)){
headers.put("clientId", Base64.getDecoder().decode(clientIdStr));
}
if(!ObjectUtils.isEmpty(qosStr)){
headers.put("qos", Base64.getDecoder().decode(qosStr));
}
if(!ObjectUtils.isEmpty(timestampStr)) {
headers.put("timestamp", Base64.getDecoder().decode(timestampStr));
}
msg.setHeaders(headers);
// log.info("msg:" + msg.toString());
return decoder.decode(msg);
} catch (Exception e) {
log.error("反序列化异常!", e);
return null;
}
return decoder.decode(msg);
}
}
......@@ -3,6 +3,7 @@ package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.mortals.xhx.queue.*;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
......@@ -17,11 +18,12 @@ import java.util.concurrent.TimeoutException;
@Slf4j
public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
private String defaultTopic;
private TbRabbitMqSettings rabbitMqSettings;
private ListeningExecutorService producerExecutor;
private Channel channel;
private Connection connection;
private String defaultTopic;
private TbRabbitMqSettings rabbitMqSettings;
private ListeningExecutorService producerExecutor;
private Channel channel;
private Connection connection;
private final Gson gson = new Gson();
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
......@@ -33,8 +35,8 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
// throw new RuntimeException("Failed to create connection." , e);
log.error("Failed to create connection.", e);
// throw new RuntimeException("Failed to create connection." , e);
}
}
......@@ -51,19 +53,31 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi);
Boolean topicIfNotExist = createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties();
// .builder()
// .contentType("application/json")
// .deliveryMode(2) // 消息是否持久化,1未持久,2持久
// .contentEncoding("UTF-8") // 编码方式
// .expiration("100000") // 过期时间单位毫秒
// .build();
try {
if (!topicIfNotExist) {
channel.queueDeclare(tpi.getTopic(), true, false, false, null);
}
//channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(null);
}
} catch (IOException e) {
log.error("Failed publish message: [{}]." , msg, e);
} catch (
IOException e) {
log.error("Failed publish message: [{}].", msg, e);
if (callback != null) {
callback.onFailure(e);
}
}
}
@Override
......@@ -87,11 +101,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
}
}
private void createTopicIfNotExist(TopicPartitionInfo tpi) {
private Boolean createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
return;
return true;
}
// admin.createTopicIfNotExists(tpi.getFullTopicName());
topics.add(tpi);
return false;
}
}
......@@ -5,8 +5,6 @@
<el-tag slot="table-body-head" style="margin:5px" type="danger">当前离线设备总计:{{tableData.offlineCount}}</el-tag>
<el-tag v-for='($label, $value) in tableData.offlineDeviceType'
:key='$value'
:label="$value" slot="table-body-head" style="margin:5px" type="danger">{{$value}}离线设备:{{$label}}</el-tag>
......@@ -26,38 +24,7 @@ export default {
components: { dialogShow },
mixins: [table],
created() {
// let _this = this;
// const getsocketData = (e) => {
// // 创建接收消息函数
// const data = e && e.detail.data;
// let obj = JSON.parse(data);
// if (obj.type == "SEND_TO_ALL_REQUEST") {
// let msg = "";
// let content = JSON.parse(obj.body.content);
// if (content.deviceOnlineStatus == 1) {
// console.log(_this.tableData.dict)
// msg = _this.tableData.dict.deviceType[content.deviceType]+ "设备:" + content.deviceCode + " 上线!";
// } else {
// msg = _this.tableData.dict.deviceType[content.deviceType]+"设备:" + content.deviceCode + " 离线!";
// }
// _this.$notify({
// title: "警告",
// message: msg,
// type: "warning",
// duration: 8000,
// });
// _this.getData();
// }
// console.log(data);
// };
// this.getsocketData = getsocketData;
// // 注册监听事件
// window.addEventListener("onmessageWS", getsocketData,false);
},
methods: {
/** 重写新增方法 */
......@@ -72,6 +39,23 @@ export default {
toView(row) {
this.$refs.dialogform.view(row);
},
activeDevice(row) {
this.$post("/device/save", {
"entity.id": row.id,
"entity.active": 1,
})
.then((res) => {
if (res.code == 1) {
this.$message.success("激活设备成功!");
this.getData()
}
})
.catch((error) => {
this.$message.error(error.message);
});
},
},
data() {
return {
......@@ -115,11 +99,14 @@ export default {
prop: "onlineTime",
formatter: this.formatterDate,
},
{ label: "激活状态", prop: "active", formatter: this.formatterYES },
{
label: "操作",
width: 240,
formatter: (row) => {
return (
<div>
<table-buttons
noAdd
row={row}
......@@ -127,6 +114,21 @@ export default {
onView={this.toView}
onDel={this.toDel}
/>
{row.active === 0 ? (
<el-button
size="mini"
type="text"
icon="el-icon-open"
onClick={() => {
this.activeDevice(row);
}}
>
激活
</el-button>
) : (
""
)}
</div>
);
},
},
......
......@@ -17,7 +17,7 @@ module.exports = {
hot: true,//自动保存
proxy: {
'/m': {
target: 'http://127.0.0.1:18211',
target: 'http://127.0.0.1:18222',
changeOrigin: true,
secure: false,
cookieDomainRewrite: 'plm.testnew.com',
......
......@@ -45,6 +45,8 @@
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......
package com.mortals.xhx.base.framework.config;
import com.mortals.framework.springcloud.config.web.BaseWebMvcConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: zxfei
* @date: 2021/8/13 0:24
* @description:
**/
@Configuration
public class AccountConfig {
// @Bean
// public Contract feignContract() {
// return new HierarchicalContract();
// }
@Bean
public BaseWebMvcConfigurer getBaseWebMvc(){
return new BaseWebMvcConfigurer(false,null);
}
}
......@@ -18,6 +18,10 @@ public class DeviceReq implements Serializable {
* 设备编码(暂定mac地址)
*/
private String deviceNum;
/**
* mac地址
*/
private String deviceMac;
private String ip;
......
......@@ -21,10 +21,4 @@ public class ApiResp<T> {
*/
private T data;
public boolean isSuccess() {
if (YesNoEnum.YES.getValue() == code) {
return true;
}
return false;
}
}
......@@ -6,6 +6,7 @@ import com.mortals.framework.util.StringUtils;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.module.device.model.DeviceEntity;
import io.jsonwebtoken.Claims;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
......@@ -23,6 +24,7 @@ import io.jsonwebtoken.SignatureAlgorithm;
* @author zxfei
*/
@Component
@CommonsLog
public class DeviceTokenService {
// 令牌自定义标识
@Value("${token.header}")
......@@ -54,11 +56,16 @@ public class DeviceTokenService {
// 获取请求携带的令牌
String token = getToken(request);
if (StringUtils.isNotEmpty(token)) {
Claims claims = parseToken(token);
String uuid = (String) claims.get(Constant.LOGIN_DEVICE_KEY);
String deviceKey = getTokenKey(uuid);
DeviceEntity device = cacheService.get(deviceKey, DeviceEntity.class);
return device;
try {
Claims claims = parseToken(token);
String uuid = (String) claims.get(Constant.LOGIN_DEVICE_KEY);
String deviceKey = getTokenKey(uuid);
DeviceEntity device = cacheService.get(deviceKey, DeviceEntity.class);
return device;
} catch (Exception e) {
log.error("解析jwt token异常!",e);
return null;
}
}
return null;
}
......
......@@ -31,7 +31,7 @@ import java.util.Date;
*/
@RestController
@CommonsLog
@RequestMapping("/api/device")
@RequestMapping("/api")
public class DeviceApiController {
@Autowired
......@@ -62,7 +62,6 @@ public class DeviceApiController {
//判断设备是否已经激活,如果已激活返回token信息
if (deviceEntity.getActive() == ActiveEnum.已激活.getValue() && deviceEntity.getStatus() == StatusEnum.启用.getValue()) {
String token = deviceTokenService.createToken(deviceEntity);
deviceResp.setToken(token);
}
deviceEntity.setOnlineTime(new Date());
......@@ -77,8 +76,7 @@ public class DeviceApiController {
deviceEntity = new DeviceEntity();
deviceEntity.initAttrValue();
deviceEntity.setDeviceCode(req.getDeviceNum());
deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue());
deviceEntity.setDeviceMac(req.getDeviceMac());
deviceEntity.setIp(req.getIp());
deviceEntity.setPort(req.getPort());
deviceEntity.setSiteNum(req.getSitenum());
......@@ -86,7 +84,6 @@ public class DeviceApiController {
deviceEntity.setCreateUserId(1L);
deviceEntity.setCreateTime(new Date());
deviceService.save(deviceEntity);
rsp.setMsg("当前设备未激活,激活后再上线。");
WebSocketUtil.broadcast(SendToAllRequest.TYPE, new SendToAllRequest().setContent(JSON.toJSONString(deviceEntity)));
}
......@@ -111,7 +108,7 @@ public class DeviceApiController {
@PostMapping("upload")
@DeviceAuth
public String upload(@RequestBody DeviceReq req) {
log.debug("【设备数据上报】【请求体】--> " + JSONObject.toJSONString(req));
log.info("【设备数据上报】【请求体】--> " + JSONObject.toJSONString(req));
ApiResp<String> rsp = new ApiResp<>();
rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
......@@ -120,7 +117,8 @@ public class DeviceApiController {
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中
deviceService.sendDeviceMessage(deviceEntity.getId(), JSON.toJSONString(req), null);
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), JSON.toJSONString(req), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
}
} catch (Exception e) {
log.error("接收数据失败", e);
......
package com.mortals.xhx.daemon.applicationservice;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
import com.mortals.xhx.busiz.req.DeviceReq;
import com.mortals.xhx.common.code.ActiveEnum;
import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
import com.mortals.xhx.common.code.StatusEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceLogEntity;
import com.mortals.xhx.module.device.model.DeviceQuery;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.DefaultTbCoreConsumerService;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.utils.IotThreadFactory;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Component
//@ConditionalOnProperty(name="com.mortal",prefix = "",havingValue = "xxx")
@CommonsLog
public class DemoStartedService implements IApplicationStartedService {
private static Log logger = LogFactory.getLog(DemoStartedService.class);
@Autowired
private DefaultTbCoreConsumerService consumerService;
@Autowired
private DeviceLogService deviceLogService;
@Autowired
private DeviceService deviceService;
protected volatile ExecutorService consumersExecutor;
protected Boolean stopped = false;
@Override
public void start() {
logger.info("开始服务..[配置已加载完成,并且所有框架都已经初始化]");
log.info("开始服务..[配置已加载完成,并且所有框架都已经初始化]");
TbQueueConsumer<TbQueueMsg> mainConsumer = consumerService.getMainConsumer();
if (!ObjectUtils.isEmpty(mainConsumer)) {
//订阅所有已几快活设备
Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream()
.map(item ->
new TopicPartitionInfo(Constant.UPLOAD_TOPIC + item.getDeviceMac(), null)
).collect(Collectors.toSet());
mainConsumer.subscribe(topicPartitionInfoSet);
log.info("消费线程订阅设备上报消息成功!:" + JSON.toJSONString(topicPartitionInfoSet));
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName("消费queue线程"));
consumersExecutor.submit(() -> {
while (!stopped) {
try {
TbQueueMsg queueMsg = consumerService.getComsureQueue().poll();
if (!ObjectUtils.isEmpty(queueMsg)) {
//做相应业务,做日志操作
DeviceReq deviceReq = JSON.parseObject(new String(queueMsg.getData()), DeviceReq.class);
boolean bool = false;
DeviceEntity deviceEntity = deviceService.getExtCache(deviceReq.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) {
if (deviceEntity.getDeviceOnlineStatus() == DeviceOnlineStatusEnum.离线.getValue()) {
bool = true;
}
deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue());
deviceEntity.setIp(deviceReq.getIp());
deviceEntity.setPort(deviceReq.getPort());
deviceEntity.setSiteNum(deviceReq.getSitenum());
deviceEntity.setCenternum(deviceReq.getCenternum());
deviceService.update(deviceEntity);
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceNum(deviceEntity.getDeviceCode());
deviceLogEntity.setContent(JSONObject.toJSONString(deviceReq));
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity);
if (bool) {
WebSocketUtil.broadcast(SendToAllRequest.TYPE, new SendToAllRequest().setContent(JSON.toJSONString(deviceEntity)));
}
}
}
} catch (Exception e) {
if (!stopped) {
try {
Thread.sleep(1000);
} catch (InterruptedException e2) {
}
}
}
}
mainConsumer.commit();
});
}
}
@Override
public void stop() {
logger.info("停止服务..");
log.info("停止服务..");
// if (mainConsumer != null) {
// mainConsumer.unsubscribe();
// }
}
@Override
......
......@@ -376,7 +376,7 @@ public class DeviceEntity extends DeviceVo {
this.deviceOnlineStatus = 0;
this.status = 0;
this.status = 1;
this.active = 0;
......
......@@ -43,13 +43,11 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
@Override
public ApiResp<String> sendDeviceMessage(Long deviceId, String message, Context context) {
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
//发送消息
send(message, context, resp, deviceId);
return resp;
}
......@@ -69,21 +67,22 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
DeviceEntity deviceEntity = this.get(deviceId, context);
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC+deviceEntity.getDeviceMac()).build();
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.CLIENTID, "device".getBytes());
Map<String, Object> headers = new HashMap<>();
headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID(), message == null ? "".getBytes() : message.getBytes(), header);
producer.send(info, queueMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac());
log.info("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac());
}
@Override
public void onFailure(Throwable t) {
log.error("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac(), t);
}
});
......
......@@ -49,5 +49,32 @@ GET {{baseUrl}}/device/delete?id={{Device_id}}
Accept: application/json
###激活设备
POST {{baseUrl}}/api/active
Content-Type: application/json
{
"deviceNum":"a12345678",
"deviceMac":"AB:DD:DF:FD:AD:FA:DA:SS",
"action":"active"
}
> {%
client.global.set("authToken", JSON.parse(response.body).data.token);
%}
###设备上报
POST {{baseUrl}}/api/upload
Content-Type: application/json
Authorization: Bearer {{authToken}}
{
"deviceNum":"a12345678",
"action":"upload"
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment