Commit 513ea6e1 authored by 赵啸非's avatar 赵啸非

修改redis 过期事件通知

parent 413563ef
......@@ -79,11 +79,6 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.zxing</groupId>
<artifactId>javase</artifactId>
......
......@@ -62,11 +62,11 @@
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
</dependency>
<dependency>
<!-- <dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.1</version>
</dependency>
</dependency>-->
<dependency>
<groupId>junit</groupId>
......
package com.mortals.xhx.base.framework.listener;
import cn.hutool.core.util.StrUtil;
import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
import com.mortals.xhx.common.code.DeviceStatusEnum;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.model.ProductEntity;
import com.mortals.xhx.module.product.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Date;
/**
* @author: zxfei
......@@ -13,9 +27,39 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomerKeyExpirationListener implements MessageListener {
@Autowired
private DeviceService deviceService;
@Autowired
private ProductService productService;
@Autowired
private PlatformService platformService;
@Override
public void onMessage(Message message, byte[] bytes) {
String key = message.toString();
String subStr = StrUtil.removePrefix(key, RedisKey.KEY_DEVICE_ONLINE_CACHE);
if(!subStr.equals(key)){
DeviceEntity deviceEntity = deviceService.getExtCache(subStr);
if (!ObjectUtils.isEmpty(deviceEntity)) {
if (deviceEntity.getDeviceOnlineStatus() == DeviceOnlineStatusEnum.在线.getValue()) {
deviceEntity.setOfflineTime(new Date());
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.离线.getValue());
deviceService.update(deviceEntity);
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (!ObjectUtils.isEmpty(platformEntity) && !ObjectUtils.isEmpty(productEntity)) {
//通知第三方平台
deviceService.sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceStatusEnum.OFFLINE);
}
}
// TODO: 2022/6/23 告警信息保存与发送
}
}
log.info("监听到key:" + key + "过期");
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@ package com.mortals.xhx.busiz;
import com.mortals.xhx.common.code.CommentTypeEnum;
import com.mortals.xhx.common.code.ProcessStatusEnum;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
......@@ -20,44 +19,36 @@ public abstract class BaseReq implements Serializable {
/**
* 任务id 必填
*/
@ApiModelProperty(value = "任务id", required = true)
private String taskId;
/**
* 流程实例的id
*/
@ApiModelProperty(value = "流程实例的id", required = true)
private String processInstanceId;
/**
* 节点id 选填
*/
@ApiModelProperty(value = "节点id")
private String activityId;
/**
* 节点名称 选填
*/
@ApiModelProperty(value = "节点名称")
private String activityName;
/**
* 流程实例状态 必填
*/
@ApiModelProperty(value = "流程实例状态", required = true)
private ProcessStatusEnum processStatusEnum;
/**********************审批意见的参数**********************/
/**
* 操作人code 必填
*/
@ApiModelProperty(value = "操作人code", required = true)
private String userCode;
/**
* 审批意见 必填
*/
@ApiModelProperty(value = "审批意见", required = true)
private String message;
/**
* 审批意见类型 必填
*/
@ApiModelProperty(value = "审批意见类型", required = true)
private CommentTypeEnum commentTypeEnum;
......
......@@ -475,7 +475,15 @@ public class DeviceApiController {
}
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceCode());
if (ObjectUtils.isEmpty(deviceEntity)) {
throw new AppException("当前设备不存在!");
//初始化新增基础设备,后续在线完善信息后再行注册添加
deviceEntity = new DeviceEntity();
deviceEntity.initAttrValue();
deviceEntity.setDeviceMac(req.getDeviceCode());
deviceEntity.setDeviceCode(req.getDeviceCode());
deviceEntity.setCreateTime(new Date());
deviceEntity.setCreateUserId(1L);
deviceService.getDeviceDao().insert(deviceEntity);
}
return deviceEntity;
}
......
......@@ -46,6 +46,10 @@ public final class Constant {
public static final Integer SERVER_PORT = 8074;
public static final Integer CLIENT_PORT = 8073;
/**
* 设备心跳检查时间(秒)
*/
public static final String HEARTBEAT_TIMEOUT = "heartbeat_timeout";
/**
......
......@@ -6,8 +6,12 @@ package com.mortals.xhx.common.key;
public class RedisKey {
/**
* 登录 cookies key
* 登录 key
*/
public static final String KEY_MENU_CACHE = "iot:base:MenuCacheKey:";
/**
* 设备心跳上报
*/
public static final String KEY_DEVICE_ONLINE_CACHE = "device:online:";
}
......@@ -2,12 +2,15 @@ package com.mortals.xhx.daemon.applicationservice;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
import com.mortals.xhx.busiz.req.DeviceReq;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.common.utils.SendTask;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import com.mortals.xhx.module.device.model.DeviceEntity;
......@@ -39,6 +42,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.mortals.xhx.common.model.MessageHeader.MESSAGETYPE;
@Component
@Slf4j
......@@ -56,6 +61,8 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
private PlatformService platformService;
@Autowired
private SendTaskThreadPool sendTaskThreadPool;
@Autowired
private ICacheService cacheService;
protected volatile ExecutorService consumersExecutor;
......@@ -69,8 +76,8 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
if (!ObjectUtils.isEmpty(mainConsumer)) {
//订阅所有已几快活设备
Set<TopicPartitionInfo> topicPartitionInfoSet = deviceService.find(new DeviceQuery().active(ActiveEnum.已激活.getValue()).status(StatusEnum.启用.getValue())).stream()
.filter(f->!ObjectUtils.isEmpty(platformService.get(f.getPlatformId())))
.filter(f->!ObjectUtils.isEmpty(productService.get(f.getProductId())))
.filter(f -> !ObjectUtils.isEmpty(platformService.get(f.getPlatformId())))
.filter(f -> !ObjectUtils.isEmpty(productService.get(f.getProductId())))
.map(item -> {
PlatformEntity platformEntity = platformService.get(item.getPlatformId());
ProductEntity productEntity = productService.get(item.getProductId());
......@@ -86,7 +93,6 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
int waitTime = 1000;
while (!stopped) {
try {
List<TbQueueMsg> messageList = new ArrayList(32);
TbQueueMsg message;
for (int i = 0; i < 32; i++) {
......@@ -101,7 +107,6 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
break;
}
}
if (messageList.size() > 0) {
log.debug("poll messageQueue messages: {}", JSON.toJSONString(messageList));
//异步消息分发
......@@ -158,10 +163,11 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
boolean bool = false;
DeviceEntity deviceEntity = deviceService.getExtCache(deviceReq.getDeviceCode());
if (!ObjectUtils.isEmpty(deviceEntity)) {
cacheService.hsetnx(RedisKey.KEY_DEVICE_ONLINE_CACHE + deviceEntity.getDeviceCode(), "", GlobalSysInfo.getParamIntValue(Constant.HEARTBEAT_TIMEOUT, 180));
if (deviceEntity.getDeviceOnlineStatus() == DeviceOnlineStatusEnum.离线.getValue()) {
bool = true;
}
if(deviceEntity.getDeviceOnlineStatus()==DeviceOnlineStatusEnum.在线.getValue()){
if (deviceEntity.getDeviceOnlineStatus() == DeviceOnlineStatusEnum.在线.getValue()) {
deviceEntity.setOnlineTime(new Date());
deviceEntity.setDeviceOnlineStatus(DeviceOnlineStatusEnum.在线.getValue());
deviceService.update(deviceEntity);
......@@ -173,31 +179,26 @@ public class DeviceMsgComsumerStartedService implements IApplicationStartedServi
deviceService.sendThirdParty(deviceEntity, productEntity, platformEntity, DeviceStatusEnum.OFFLINE);
}
}
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceNum(deviceEntity.getDeviceCode());
deviceLogEntity.setContent(JSONObject.toJSONString(deviceReq));
deviceLogEntity.setLogType(LogTypeEnum.上报事件.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity);
if (!Constant.MESSAGETYPE_HEARTBEAT.equals(queueMsg.getHeaders().getData().get(MESSAGETYPE))) {
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.initAttrValue();
deviceLogEntity.setDeviceId(deviceEntity.getId());
deviceLogEntity.setDeviceName(deviceEntity.getDeviceName());
deviceLogEntity.setDeviceNum(deviceEntity.getDeviceCode());
deviceLogEntity.setContent(JSONObject.toJSONString(deviceReq));
deviceLogEntity.setLogType(LogTypeEnum.上报事件.getValue());
deviceLogEntity.setCreateUserId(1L);
deviceLogEntity.setCreateTime(new Date());
deviceLogService.save(deviceLogEntity);
}
//获取exchange,
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
if (!ObjectUtils.isEmpty(platformEntity) && platformEntity.getSendSwitch() == YesNoEnum.YES.getValue()) {
if (platformEntity.getSendMsgType() == SendMsgTypeEnum.http.getValue() && !ObjectUtils.isEmpty(platformEntity.getSendUrl())) {
//http方式
//通过线程池进行发送消息
SendTask sendTask = new SendTask(platformEntity.getSendUrl(), new String(queueMsg.getData()));
sendTaskThreadPool.execute(sendTask);
SendTask sendTask = new SendTask(platformEntity.getSendUrl(), new String(queueMsg.getData()));
sendTaskThreadPool.execute(sendTask);
}
}
// if (bool) {
......
package com.mortals.xhx.daemon.applicationservice;
import com.mortals.framework.service.ICacheService;
import com.mortals.xhx.base.framework.listener.CustomerKeyExpirationListener;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import com.mortals.framework.springcloud.service.IApplicationService;
......@@ -15,7 +18,7 @@ import java.time.Duration;
@Component
@Slf4j
public class DemoStartService implements IApplicationService {
public class DeviceStartService implements IApplicationService {
@Autowired
private SendTaskThreadPool sendTaskThreadPool;
......@@ -26,8 +29,10 @@ public class DemoStartService implements IApplicationService {
public void start() {
log.info("初始化发送线程数量");
sendTaskThreadPool.init(20);
cacheService.setnx("test-expire","111", 100);
log.info("初始化过期key监听事件");
for(int i=0;i<100;i++){
cacheService.setnx("test-expire"+i,"111", 10+i);
}
log.info("开始服务..[配置已加载完成,但部分框架还未初始化,比如:Kafka]");
}
......
......@@ -55,8 +55,8 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
@Override
public void excuteTask(ITask task) throws AppException {
log.debug("设备状态统计,开始执行");
doDeviceUpOrDown();
doDeviceLogDel();
//doDeviceUpOrDown();
//doDeviceLogDel();
log.debug("设备状态统计,结束执行");
}
......
......@@ -3,6 +3,7 @@ import com.mortals.framework.model.Context;
import com.mortals.framework.service.ICRUDCacheService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.DeviceStatusEnum;
import com.mortals.xhx.module.device.dao.DeviceDao;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.product.model.ProductEntity;
......@@ -44,5 +45,7 @@ public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
void sendThirdParty(DeviceEntity entity, ProductEntity productEntity, PlatformEntity platformEntity, DeviceStatusEnum update);
DeviceDao getDeviceDao();
}
\ No newline at end of file
......@@ -66,6 +66,7 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
throw new AppException("所属产品不能为空!");
}
super.validData(entity, context);
}
......@@ -196,6 +197,11 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
}
}
@Override
public DeviceDao getDeviceDao() {
return this.getDao();
}
@Override
protected void removeBefore(Long[] ids, Context context) throws AppException {
Arrays.asList(ids).stream().forEach(id -> {
......
......@@ -26,7 +26,7 @@ arrays|数组类型|[{"name":"zhang3"},{"name":"zhang2"}]
**内容类型:** application/json;charset=utf-8
**简要描述:** 设备激活
**简要描述:** 服务端地址获取
**广播参数:**
......@@ -71,7 +71,7 @@ data|String|数据对象|-
**内容类型:** application/json;charset=utf-8
**简要描述:** 设备注册,成功返回响应rabbmit连接参数
**简要描述:** 设备注册,成功返回响应rabbmit连接参数,如未找到该设备,系统会根据设备码创建新设备
**请求参数:**
......
......@@ -36,11 +36,11 @@
<![CDATA[jdbc:mysql://localhost:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>12345678</profiles.datasource.password>
<profiles.redis.uri>127.0.0.1</profiles.redis.uri>
<profiles.redis.uri>192.168.0.98</profiles.redis.uri>
<profiles.redis.port>6379</profiles.redis.port>
<profiles.redis.username></profiles.redis.username>
<profiles.redis.password></profiles.redis.password>
<profiles.redis.database>7</profiles.redis.database>
<profiles.redis.database>1</profiles.redis.database>
<profiles.kafka.brokers>192.168.0.251:9092</profiles.kafka.brokers>
<profiles.queue.type>rabbitmq</profiles.queue.type>
<profiles.rabbitmq.host>192.168.0.98</profiles.rabbitmq.host>
......@@ -65,11 +65,11 @@
<![CDATA[jdbc:mysql://192.168.0.98:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>nacos@2020</profiles.datasource.password>
<profiles.redis.uri>192.168.0.252</profiles.redis.uri>
<profiles.redis.uri>192.168.0.98</profiles.redis.uri>
<profiles.redis.port>6379</profiles.redis.port>
<profiles.redis.username></profiles.redis.username>
<profiles.redis.password>hotel@2020</profiles.redis.password>
<profiles.redis.database>7</profiles.redis.database>
<profiles.redis.password></profiles.redis.password>
<profiles.redis.database>2</profiles.redis.database>
<profiles.kafka.brokers>192.168.0.251:9092</profiles.kafka.brokers>
<profiles.rabbitmq.host>192.168.0.98</profiles.rabbitmq.host>
<profiles.rabbitmq.port>5672</profiles.rabbitmq.port>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment