Commit a3bcf020 authored by 赵啸非's avatar 赵啸非

修改消息组件

parent 6f294d8a
......@@ -16,4 +16,7 @@ public interface TbQueueProducer<T extends TbQueueMsg> {
void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback);
void stop();
void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback);
}
......@@ -44,7 +44,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
private Set<TopicPartitionInfo> topics;
@Builder
private TbKafkaProducerTemplate(TbKafkaSettings settings,String defaultTopic) {
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic) {
this.settings = settings;
//初始化生产者参数
this.producer = new KafkaProducer<>(settings.toProducerProps());
......@@ -90,5 +90,10 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
}
}
@Override
public void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback) {
// TODO: 2022/4/29 创建kafka队列
}
}
......@@ -20,20 +20,13 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
* 消息队列提供
*/
@Autowired
private TbCoreQueueFactory tbQueueProvider;
private TbCoreQueueFactory tbQueueProvider;
/**
* 消息队列生产者
*/
private TbQueueProducer<TbQueueMsg> queueProducer;
// public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {
// this.tbQueueProvider = tbQueueProvider;
// }
//
@PostConstruct
public void init() {
log.info("消息队列生产服务开始...");
......
......@@ -56,7 +56,7 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
GetResponse getResponse = channel.basicGet(queue, true);
return getResponse;
} catch (IOException e) {
log.error("Failed to get messages from queue: [{}]" , queue,e);
log.error("Failed to get messages from queue: {},{}" , queue,e);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
......
......@@ -79,13 +79,17 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
AMQP.BasicProperties properties = new AMQP.BasicProperties();
try {
if (!topicIfNotExist) {
//topic不存在创建通道队列
channel.queueDeclare(tpi.getTopic(), true, false, false, null);
}
if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
channel = connection.createChannel();
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT, true, false, null);
}
channel.basicPublish(tpi.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
channel.queueBind(tpi.getTopic(), tpi.getExchangeName(), tpi.getTopic());
//channel.queueBind()
channel.basicPublish(tpi.getExchangeName(), tpi.getTopic(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(null);
}
......@@ -119,6 +123,28 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
}
}
@Override
public void queueDeclare(TopicPartitionInfo tpi, TbQueueCallback callback) {
Boolean topicIfNotExist = createTopicIfNotExist(tpi);
try {
if (!topicIfNotExist) {
//topic不存在创建通道队列
channel.queueDeclare(tpi.getTopic(), true, false, false, null);
}
if (!innerExists(tpi.getExchangeName(), channel)) {
//判断交换机是否存在,如果不存在则创建后与新加入的队列绑定
channel = connection.createChannel();
channel.exchangeDeclare(tpi.getExchangeName(), BuiltinExchangeType.DIRECT, true, false, null);
}
callback.onSuccess(null);
} catch (IOException e) {
log.error("Failed publish message: {}.", e);
if (callback != null) {
callback.onFailure(e);
}
}
}
private Boolean createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
return true;
......
......@@ -148,9 +148,6 @@ export default {
formatterYES(row, column, val) {
const content = formatter(this.tableData, column, val);
console.log("content:"+content)
//return content;
if (content) {
if (val == '0') {
return <el-tag type={'danger'} size='mini'>{content}</el-tag>
......
......@@ -30,7 +30,10 @@ const router = new Router({
...restBuilder('area', 'system/area'), // 系统管理-区域管理
builder('site/list', 'system/site/index'),//站点
...restBuilder('device', 'device'),//
...restBuilder('platform', 'platform'),//平台
...restBuilder('product', 'product'),//产品
...restBuilder('device', 'device'),//设备
...restBuilder('device/log', 'device/log'),//
......
......@@ -4,8 +4,9 @@
<el-form ref="form" :model="form" :rules="rules" label-width="120px">
<el-row>
<!-- <Field :span="20" label="设备名称" prop="deviceName" v-model="form.deviceName" placeholder="请输入设备名称"/> -->
<Field :span="20" label="设备编码" prop="deviceCode" v-model="form.deviceCode" type="textarea" placeholder="请输入设备编码"/>
<Field :span="20" label="设备类型" prop="deviceType" v-model="form.deviceType" type="select" :enumData="dict.deviceType" placeholder="请选择设备类型"/>
<Field :span="20" label="设备编码" prop="deviceCode" v-model="form.deviceCode" placeholder="请输入设备编码"/>
<Field :span="20" label="平台类型" prop="platformId" v-model="form.platformId" type="select" :enumData="dict.platformId" placeholder="请选择平台类型"/>
<Field :span="20" label="产品类型" prop="productId" v-model="form.productId" type="select" :enumData="dict.productId" placeholder="请选择产品类型"/>
<Field :span="20" label="设备的MAC地址" prop="deviceMac" v-model="form.deviceMac" placeholder="请输入设备的MAC地址"/>
<Field :span="20" label="中心设备编码" v-model="form.centernum" placeholder="请输入中心设备编码"/>
......@@ -23,8 +24,7 @@
/>
<Field :span="20" label="备注" prop="deviceRemark" v-model="form.deviceRemark" type="textarea" placeholder="请输入备注"/>
<Field :span="20" label="启用状态 " prop="status" v-model="form.status" type="radio" :enumData="dict.status" placeholder="请选择启用状态 "/>
<Field :span="20" label="启用状态 " prop="status" v-model="form.status" type="radio" :enumData="dict.status" placeholder="请选择启用状态 "/>
</el-row>
......
<template>
<layout-form>
<el-form :model="form" :loading="loading" :rules="rules" size='small' style="width:100%" label-width='120px' ref="form">
<el-row>
<Field label="设备名称" prop="deviceName" v-model="form.deviceName" placeholder="请输入设备名称"/>
<Field label="设备编码" prop="deviceCode" v-model="form.deviceCode" type="textarea" placeholder="请输入设备编码"/>
<Field label="设备类型" prop="deviceType" v-model="form.deviceType" type="select" :enumData="dict.deviceType" placeholder="请选择设备类型"/>
<Field label="设备的MAC地址" prop="deviceMac" v-model="form.deviceMac" placeholder="请输入设备的MAC地址"/>
<Field label="设备访问ip" prop="ip" v-model="form.ip" placeholder="请输入设备访问ip"/>
<Field label="中心设备编码" prop="centernum" v-model="form.centernum" placeholder="请输入中心设备编码"/>
<Field label="端口" prop="port" v-model="form.port" placeholder="请输入端口"/>
<Field label="站点编号" prop="siteNum" v-model="form.siteNum" placeholder="请输入站点编号"/>
<Field label="设备生产厂商ID" prop="deviceFirmId" v-model="form.deviceFirmId" placeholder="请输入设备生产厂商ID"/>
<Field label="设备生产厂商名称" prop="deviceFirmname" v-model="form.deviceFirmname" placeholder="请输入设备生产厂商名称"/>
<Field label="在线状态 " prop="deviceOnlineStatus" v-model="form.deviceOnlineStatus" type="select" :enumData="dict.deviceOnlineStatus" placeholder="请选择在线状态 "/>
<Field label="启用状态 " prop="status" v-model="form.status" type="select" :enumData="dict.status" placeholder="请选择启用状态 "/>
<Field label="备注" prop="deviceRemark" v-model="form.deviceRemark" type="textarea" placeholder="请输入备注"/>
<Field label="最近上线时间" prop="onlineTime" v-model="form.onlineTime" type="date" />
<Field label="最近离线时间" prop="offlineTime" v-model="form.offlineTime" type="date" />
</el-row>
<form-buttons @submit='submitForm' :noSaveBtn="pageInfo.type === 'view'"/>
</el-form>
</layout-form>
</template>
<script>
import form from "@/assets/mixins/form";
export default {
mixins: [form],
components: {
},
methods: {
},
data() {
return {
toString:[
"deviceType",
"deviceOnlineStatus",
"status",
],
rules: {
deviceName: [
{required: true,message: "请输入设备名称", trigger: "blur" },
{max: 20,message: "最多只能录入20个字符",trigger: "blur",},
],
deviceType: [
{required: true,message: "请输入设备类型", trigger: "blur" },
],
deviceOnlineStatus: [
{required: true,message: "请输入在线状态 ", trigger: "blur" },
],
status: [
{required: true,message: "请输入启用状态 ", trigger: "blur" },
],
createTime: [
{required: true,message: "请选择创建时间" },
],
}
};
}
};
</script>
\ No newline at end of file
<template>
<layout-view>
<el-descriptions :title="title" :column="column" :size="size" :colon="false" border>
<template slot="title">
<i class="el-icon-tickets"></i>
基本详细信息
</template>
<template slot="extra">
<el-button type="primary" @click="$router.go(-1)" size="small">返回</el-button>
</template>
<el-descriptions-item label="设备名称" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceName}}
</el-descriptions-item>
<el-descriptions-item label="设备编码" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceCode}}
</el-descriptions-item>
<el-descriptions-item label="设备类型" label-class-name="labelClass" content-class-name="contentClass">
{{ util_formatters("deviceType", form.deviceType) }}
</el-descriptions-item>
<el-descriptions-item label="设备的MAC地址" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceMac}}
</el-descriptions-item>
<el-descriptions-item label="设备访问ip" label-class-name="labelClass" content-class-name="contentClass">
{{form.ip}}
</el-descriptions-item>
<el-descriptions-item label="中心设备编码" label-class-name="labelClass" content-class-name="contentClass">
{{form.centernum}}
</el-descriptions-item>
<el-descriptions-item label="端口" label-class-name="labelClass" content-class-name="contentClass">
{{form.port}}
</el-descriptions-item>
<el-descriptions-item label="站点编号" label-class-name="labelClass" content-class-name="contentClass">
{{form.siteNum}}
</el-descriptions-item>
<el-descriptions-item label="设备生产厂商ID" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceFirmId}}
</el-descriptions-item>
<el-descriptions-item label="设备生产厂商名称" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceFirmname}}
</el-descriptions-item>
<el-descriptions-item label="在线状态 " label-class-name="labelClass" content-class-name="contentClass">
{{ util_formatters("deviceOnlineStatus", form.deviceOnlineStatus) }}
</el-descriptions-item>
<el-descriptions-item label="启用状态 " label-class-name="labelClass" content-class-name="contentClass">
{{ util_formatters("status", form.status) }}
</el-descriptions-item>
<el-descriptions-item label="备注" label-class-name="labelClass" content-class-name="contentClass">
{{form.deviceRemark}}
</el-descriptions-item>
<el-descriptions-item label="最近上线时间" label-class-name="labelClass" content-class-name="contentClass">
{{ util_formatterDate(form.onlineTime)}}
</el-descriptions-item>
<el-descriptions-item label="最近离线时间" label-class-name="labelClass" content-class-name="contentClass">
{{ util_formatterDate(form.offlineTime)}}
</el-descriptions-item>
</el-descriptions>
</layout-view>
</template>
<script>
import view from "@/assets/mixins/view";
export default {
mixins: [view],
components: {
},
methods: {
},
data() {
return {
size:"small",
column:2,
toString:[
"deviceType",
"deviceOnlineStatus",
"status",
],
toArrays: [
],
toDate: [
]
}
}
}
</script>
<style lang="less">
.labelClass{
width: 200px;
}
.el-descriptions__body{
margin-left: 5px;
margin-right: 5px;
color: #606266;
background-color: #FFF;
}
.contentClass{
width: 600px;
}
</style>
\ No newline at end of file
package com.mortals.xhx;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.service.impl.LocalCacheServiceImpl;
import com.mortals.framework.service.impl.RedisCacheServiceImpl;
import com.mortals.framework.springcloud.boot.BaseWebApplication;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication(scanBasePackages = {"com.mortals"})
......@@ -16,10 +11,10 @@ import org.springframework.context.annotation.ImportResource;
@ImportResource(locations = {"classpath:config/spring-config.xml"})
public class ManagerApplication extends BaseWebApplication {
@Bean
public ICacheService cacheService() {
return new LocalCacheServiceImpl();
}
// @Bean
// public ICacheService cacheService() {
// return new LocalCacheServiceImpl();
// }
public static void main(String[] args) {
SpringApplication.run(ManagerApplication.class, args);
......
......@@ -18,4 +18,25 @@ public interface MessageService {
*/
void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback);
/**
* 请求队列
* @param info
* @param callback
*/
void queueDeclare(TopicPartitionInfo info, TbQueueCallback callback);
/**
* 获取鉴权token
*/
String getBasePlatformToken();
/**
* 获取站点树
* @return
*/
String siteTree();
}
\ No newline at end of file
package com.mortals.xhx.base.system.message.impl;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.Context;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
import com.mortals.framework.util.DateUtils;
import com.mortals.framework.util.HttpUtil;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.YesNoEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.dao.DeviceDao;
......@@ -15,11 +22,20 @@ import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import lombok.extern.apachecommons.CommonsLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.entity.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.mortals.framework.util.HttpUtil.HEADER_CONTENT_TYPE;
import static com.mortals.xhx.common.key.Constant.PATH_LOGIN;
import static com.mortals.xhx.common.key.Constant.PATH_SITETREE;
/**
* DeviceService
......@@ -29,9 +45,19 @@ import java.util.List;
* @date 2022-03-09
*/
@Service("messageService")
@CommonsLog
@Slf4j
public class MessageServiceImpl implements MessageService {
@Value("${baseplatform.httpUrl:''}")
private String httpUrl;
@Value("${baseplatform.loginName:''}")
private String loginName;
@Value("${baseplatform.password:''}")
private String password;
@Autowired
private ICacheService cacheService;
@Autowired
private TbCoreQueueProducerProvider producerProvider;
......@@ -40,6 +66,75 @@ public class MessageServiceImpl implements MessageService {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
producer.send(info, queueMsg, callback);
}
@Override
public void queueDeclare(TopicPartitionInfo info, TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
producer.queueDeclare(info, callback);
}
@Override
public String getBasePlatformToken() {
String token = cacheService.get(Constant.BASEPLATFORM_AUTHTOKEN);
if (ObjectUtils.isEmpty(token)) {
JSONObject obj = new JSONObject();
obj.put("loginName", loginName);
obj.put("password", password);
obj.put("securityCode", "8888");
String resp = null;
try {
Map<String, String> header = new HashMap<>();
header.put(HEADER_CONTENT_TYPE, "application/json");
resp = HttpUtil.doPost(httpUrl + PATH_LOGIN, header, JSON.toJSONString(obj));
JSONObject jsonObject = JSON.parseObject(resp);
Integer code = jsonObject.getInteger("code");
if (code == YesNoEnum.YES.getValue()) {
JSONObject dataObj = jsonObject.getJSONObject("data");
String authtoken = dataObj.getString("token");
cacheService.setnx(Constant.BASEPLATFORM_AUTHTOKEN,authtoken,7*24*60*60*1000);
return authtoken;
} else {
throw new AppException("异常");
}
} catch (Exception e) {
log.error("异常:", e);
}
log.info("resp:{}", resp);
}
return token;
}
@Override
public String siteTree() {
String authToken = this.getBasePlatformToken();
String resp = null;
try {
Map<String, String> header = new HashMap<>();
header.put(HEADER_CONTENT_TYPE, "application/json");
header.put("Authorization", Constant.TOKEN_PREFIX + authToken);
resp = HttpUtil.doGet(httpUrl + PATH_SITETREE, header,new HashMap<>());
JSONObject jsonObject = JSON.parseObject(resp);
Integer code = jsonObject.getInteger("code");
if (code == YesNoEnum.YES.getValue()) {
JSONObject dataObj = jsonObject.getJSONObject("data");
return resp;
} else {
throw new AppException("异常");
}
} catch (Exception e) {
log.error("异常:", e);
}
log.info("resp:{}", resp);
return resp;
}
}
\ No newline at end of file
......@@ -111,15 +111,16 @@ public class DeviceApiController {
if (ObjectUtils.isEmpty(platformEntity)) {
throw new AppException("当前设备未配置所属系统平台,请在后台配置后再激活!");
}
// authInfo.setHost(platformEntity.getPlatformSn());
// authInfo.setHost(platformEntity.getPlatformSn());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (ObjectUtils.isEmpty(productEntity)) {
throw new AppException("当前设备未配置所属产品,请在后台配置后再激活!");
}
authInfo.setExchangeName(platformEntity.getPlatformSn()+Constant.EXCHANGE_SPLIT+productEntity.getProductCode());
authInfo.setExchangeName(platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode());
authInfo.setUploadTopicFilter(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceCode());
authInfo.setDownTopicFilter(Constant.DOWN_TOPIC + deviceEntity.getDeviceCode());
String token = AESUtil.encryptForApp(JSON.toJSONString(authInfo), secret);
String token = JSON.toJSONString(authInfo);
//String token = AESUtil.encryptForApp(JSON.toJSONString(authInfo), secret);
deviceResp.setToken(token);
}
deviceEntity.setOnlineTime(new Date());
......@@ -174,8 +175,11 @@ public class DeviceApiController {
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT);
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
for (int i = 0; i < 1000; i++) {
deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null);
// Thread.sleep(50);
}
//log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
}
} catch (Exception e) {
log.error("接收数据失败", e);
......
......@@ -42,6 +42,11 @@ public final class Constant {
*/
public static final String TOKEN = "token";
/**
* 基础服务平台鉴权token
*/
public static final String BASEPLATFORM_AUTHTOKEN = "baseplatform_authtoken";
/**
* 令牌前缀
*/
......@@ -81,4 +86,17 @@ public final class Constant {
*/
public static final String MESSAGETYPE_UPGREAD = "UPGREAD";
/**
* 登录path
*/
public static final String PATH_LOGIN = "login/login";
/**
* 树path
*/
public static final String PATH_SITETREE = "site/siteTree";
}
......@@ -9,12 +9,8 @@ import org.dom4j.Element;
@Data
public abstract class GateProtConfig {
/**
* 协议ID
*/
private Integer protocol;
/**提交URL*/
private String sendUrl;
/**
* 流量数
*/
......@@ -30,9 +26,14 @@ public abstract class GateProtConfig {
*/
private String paramConfig;
/**
* 发送延迟
*/
private Integer SendDelayTime = 1 * 1000;
/**
*
*/
private Integer maxDelayNum = 50;
......
package com.mortals.xhx.common.utils;
import com.mortals.framework.util.HttpUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
......@@ -11,17 +13,18 @@ import org.springframework.beans.BeanUtils;
* @description:
**/
@Slf4j
@AllArgsConstructor
public class SendTask implements Runnable {
private String sendUrl;
private String content;
@Override
public void run() {
// TODO: 2022/4/28
try {
// log.debug("启动发送"+smsGateQueueEntity);
String resp = HttpUtil.doPost(sendUrl, content);
log.debug("http resp:{}", resp);
} catch (Exception e) {
log.error("发送异常:" + e);
}
......
package com.mortals.xhx.daemon;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.Context;
import com.mortals.framework.model.PageInfo;
import com.mortals.framework.model.Result;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.base.framework.config.InterceptorConfig;
import com.mortals.xhx.base.system.upload.service.UploadService;
import com.mortals.xhx.base.system.user.model.UserEntity;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.req.UploadDeviceReq;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.OneThingRespCodeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.model.ProductEntity;
import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.BeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.HashMap;
import java.util.Map;
/**
* 一件事微信对外服务接口
......@@ -46,21 +39,39 @@ import java.util.stream.IntStream;
*/
@CrossOrigin(origins = "*")
@RestController
@CommonsLog
@Slf4j
@RequestMapping("/test/")
public class DemoApiController {
@Autowired
private TbCoreQueueProducerProvider producerProvider;
@Autowired
private DeviceService deviceService;
@Autowired
private ProductService productService;
@Autowired
private PlatformService platformService;
@Autowired
private MessageService messageService;
@GetMapping("/sentry")
public String sentry() {
log.error("[main][我就是展示下异常!]");
@PostMapping("/sentry")
public String sentry(@RequestBody String body) {
log.info("body==>{}",body);
return "success";
}
@PostMapping("/getToken")
public String getToken() {
return messageService.getBasePlatformToken();
}
@PostMapping("/siteTree")
public String siteTree() {
return messageService.siteTree();
}
@GetMapping("/exception")
public String exception() {
......@@ -122,4 +133,56 @@ public class DemoApiController {
return JSON.toJSONString(rsp);
}
/**
* 设备数据上报
*
* @param
* @return
*/
@PostMapping("upload")
public String upload(@RequestParam(value = "deviceId") Long deviceId ) {
ApiResp<String> rsp = new ApiResp<>();
rsp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
rsp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
try {
//根据设备编码查询设备
DeviceEntity deviceEntity = deviceService.get(deviceId);
if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
if (ObjectUtils.isEmpty(platformEntity)) {
throw new AppException("当前设备未配置所属系统平台,请在后台配置后再激活!");
}
// authInfo.setHost(platformEntity.getPlatformSn());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
if (ObjectUtils.isEmpty(productEntity)) {
throw new AppException("当前设备未配置所属产品,请在后台配置后再激活!");
}
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode();
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT);
UploadDeviceReq uploadDeviceReq = new UploadDeviceReq();
uploadDeviceReq.setDeviceMac(deviceEntity.getDeviceMac());
uploadDeviceReq.setDeviceNum(deviceEntity.getDeviceCode());
for (int i = 0; i < 100; i++) {
deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(deviceEntity), null);
}
}
} catch (Exception e) {
log.error("接收数据失败", e);
rsp.setCode(ApiRespCodeEnum.FAILED.getValue());
rsp.setMsg(e.getMessage());
return JSON.toJSONString(rsp);
}
log.debug("响应【设备数据上报】【响应体】--> " + JSONObject.toJSONString(rsp));
return JSON.toJSONString(rsp);
}
}
......@@ -48,10 +48,10 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
@Override
public void excuteTask(ITask task) throws AppException {
log.info("设备状态统计,开始执行");
log.debug("设备状态统计,开始执行");
doDeviceUpOrDown();
doDeviceLogDel();
log.info("设备状态统计,结束执行");
log.debug("设备状态统计,结束执行");
}
/**
......@@ -81,7 +81,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
} catch (Exception e) {
log.error("更新设备状态任务异常,结束执行", e);
}
log.info("更新设备状态任务,结束执行");
log.debug("更新设备状态任务,结束执行");
}
......@@ -101,7 +101,7 @@ public class DeviceStatTaskImpl implements ITaskExcuteService {
} catch (Exception e) {
log.error("设备日志删除任务异常,结束执行", e);
}
log.info("设备日志删除任务,结束执行");
log.debug("设备日志删除任务,结束执行");
}
......
......@@ -21,11 +21,10 @@ public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
ApiResp<String> sendDeviceMessage(DeviceEntity deviceEntity, TopicPartitionInfo info, TbQueueMsgHeaders header,String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info,TbQueueMsgHeaders header, String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info,TbQueueMsgHeaders header, String message , Context context);
}
\ No newline at end of file
package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.util.IdUtil;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.Context;
import com.mortals.framework.util.DateUtils;
import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.MessageProtocolEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.dao.DeviceDao;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.model.DeviceQuery;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.module.platform.model.PlatformEntity;
import com.mortals.xhx.module.platform.service.PlatformService;
import com.mortals.xhx.module.product.model.ProductEntity;
import com.mortals.xhx.module.product.service.ProductService;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
import com.mortals.xhx.module.device.dao.DeviceDao;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Set;
/**
* DeviceService
......@@ -34,8 +33,16 @@ import java.util.UUID;
* @date 2022-03-09
*/
@Service("deviceService")
@Slf4j
public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, DeviceEntity, Long> implements DeviceService {
@Autowired
private ProductService productService;
@Autowired
private PlatformService platformService;
@Autowired
private DefaultTbCoreConsumerService consumerService;
@Override
protected String getExtKey(DeviceEntity data) {
return data.getDeviceCode();
......@@ -49,7 +56,6 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
......@@ -63,7 +69,6 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
}
};
messageService.send(info, header, message, callback);
return resp;
}
......@@ -80,4 +85,35 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
}
@Override
protected void saveAfter(DeviceEntity entity, Context context) throws AppException {
PlatformEntity platformEntity = platformService.get(entity.getPlatformId());
ProductEntity productEntity = productService.get(entity.getProductId());
if (!ObjectUtils.isEmpty(platformEntity) && ObjectUtils.isEmpty(productEntity)) {
//注册rabbmit相关队列与绑定
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode();
TopicPartitionInfo info = TopicPartitionInfo.builder().exchangeName(exchangeName).topic(Constant.UPLOAD_TOPIC + entity.getDeviceCode()).build();
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("队列创建成功,设备编码:{}", entity.getDeviceCode());
}
@Override
public void onFailure(Throwable t) {
log.error("队列创建失败,设备通道编码:{},{}", entity.getDeviceCode(), t);
}
};
messageService.queueDeclare(info, callback);
//消费监听线程添加当前声明队列
Set<TopicPartitionInfo> partitions = new HashSet<>();
partitions.add(info);
consumerService.getMainConsumer().subscribe(partitions);
}
super.saveAfter(entity, context);
}
}
\ No newline at end of file
......@@ -3,8 +3,10 @@ package com.mortals.xhx.module.device.web;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.OrderCol;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.base.system.param.service.ParamService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ActiveEnum;
......@@ -66,6 +68,8 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
private ProductService productService;
@Autowired
private PlatformService platformService;
@Autowired
private MessageService messageService;
public DeviceController() {
super.setFormClass(DeviceForm.class);
......@@ -99,12 +103,10 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
model.put("onlineCount", collect.get(true));
model.put("offlineCount", collect.get(false));
//离线设备 按类型分组
// if (collect.get(false) > 0) {
// Map<String, Long> collectTwo = this.service.find(new DeviceQuery().deviceOnlineStatus(DeviceOnlineStatusEnum.离线.getValue()).active(ActiveEnum.已激活.getValue())).stream().collect(Collectors.groupingBy(x -> deviceTypeMap.get(x.getDeviceType().toString()), Collectors.counting()));
// model.put("offlineDeviceType", collectTwo);
// }
String resp = messageService.siteTree();
JSONObject jsonObject = JSON.parseObject(resp);
JSONArray siteTreeArray = jsonObject.getJSONObject("data").getJSONArray("siteTree");
model.put("siteTree", siteTreeArray);
super.init(request, response, form, model, context);
}
......
......@@ -36,8 +36,8 @@ spring:
enabled: false
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.p6spy.engine.spy.P6SpyDriver
#driver-class-name: com.mysql.cj.jdbc.Driver
#driver-class-name: com.p6spy.engine.spy.P6SpyDriver
driver-class-name: com.mysql.cj.jdbc.Driver
url: @profiles.datasource.uri@
username: @profiles.datasource.username@
password: @profiles.datasource.password@
......@@ -110,3 +110,8 @@ token:
expireTime: 60
# 令牌前缀
prefix: Bearer
# 基础服务平台用户名与密码
baseplatform:
httpUrl: http://192.168.0.98:11071/zwfw/
loginName: admin
password: admin
......@@ -28,7 +28,7 @@ Content-Type: application/json
"status":0,
"deviceRemark":"plxklr",
"onlineTime":"1646755200000",
"offlineTime":"1646755200000",
"offlineTime":"1646755200000"
}
> {%
......@@ -76,5 +76,38 @@ Authorization: Bearer {{authToken}}
}
###设备上报
POST {{baseUrl}}/api/upload
Content-Type: application/json
Authorization: Bearer {{authToken}}
{
"deviceNum":"AB:DD:DF:FD:AD:FA:DA:bb",
"action":"upload"
}
###设备上报
POST {{baseUrl}}/test/upload?deviceId=2
Content-Type: application/json
{
"req": 1
}
###getToken
POST {{baseUrl}}/test/getToken
Content-Type: application/json
{
"req": 1
}
###siteTree
POST {{baseUrl}}/test/siteTree
Content-Type: application/json
{}
......@@ -33,7 +33,7 @@
<profiles.server.port>18222</profiles.server.port>
<profiles.queue.type>rabbitmq</profiles.queue.type>
<profiles.datasource.uri>
<![CDATA[jdbc:p6spy:mysql://localhost:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<![CDATA[jdbc:mysql://localhost:3306/device-new-platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Hongkong]]></profiles.datasource.uri>
<profiles.datasource.username>root</profiles.datasource.username>
<profiles.datasource.password>12345678</profiles.datasource.password>
<profiles.redis.uri>127.0.0.1</profiles.redis.uri>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment