Commit d98b7399 authored by 赵啸非's avatar 赵啸非

修改消息组件

parent e41f8533
package com.mortals.xhx.common.code;
import java.util.LinkedHashMap;
import java.util.Map;
/**
*
*
* @author zxfei
*/
public enum MessageProtocolEnum {
JSON("json", "未激活");
private String value;
private String desc;
MessageProtocolEnum(String value, String desc) {
this.value = value;
this.desc = desc;
}
public String getValue() {
return this.value;
}
public String getDesc() {
return this.desc;
}
public static MessageProtocolEnum getByValue(String value) {
for (MessageProtocolEnum activeEnum : MessageProtocolEnum.values()) {
if (value.equals(activeEnum.getValue())) {
return activeEnum;
}
}
return null;
}
/**
* 获取Map集合
*
* @param eItem 不包含项
* @return
*/
public static Map<String, String> getEnumMap(String... eItem) {
Map<String, String> resultMap = new LinkedHashMap<>();
for (MessageProtocolEnum item : MessageProtocolEnum.values()) {
try {
boolean hasE = false;
for (String e : eItem) {
if (item.getValue() == e) {
hasE = true;
break;
}
}
if (!hasE) {
resultMap.put(item.getValue(), item.getDesc());
}
} catch (Exception ex) {
}
}
return resultMap;
}
}
\ No newline at end of file
package com.mortals.xhx.common.model;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.common.code.MessageProtocolEnum;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.Setter;
......@@ -18,6 +22,9 @@ public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
protected Map<String, String> data = new HashMap<>();
public DefaultTbQueueMsgHeaders() {
data.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
data.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.MD5withRSA).sign(data.get(MessageHeader.TIMESTAMP).getBytes())));
data.put(MessageHeader.MESSAGEPROTOCOL, MessageProtocolEnum.JSON.getValue());
data.put(MessageHeader.TOPIC, "");
data.put(MessageHeader.QOS, "0");
......
......@@ -12,15 +12,29 @@ public class MessageHeader {
* 客户id
*/
public static final String CLIENTID = "clientId";
/**
* 协议
*/
public static final String MESSAGEPROTOCOL = "protocol";
/**
* 时间戳
*/
public static final String TIMESTAMP = "timestamp";
/**
* 消息签名
*/
public static final String MESSAGESIGN = "sign";
/**
* 消息类型
*/
public static final String MESSAGETYPE = "messageType";
/**
* topic
*/
public static final String TOPIC = "topic";
/**
* 消息等级
*/
public static final String QOS = "qos";
public static final String RETAIN = "retain";
......@@ -29,10 +43,4 @@ public class MessageHeader {
public static final String DUP = "dup";
public static final String productKey = "dup";
// public static final String productKey = "dup";
}
package com.mortals.xhx.queue;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.UUID;
/**
......@@ -27,6 +31,8 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
* 数据载体
*/
private byte[] data;
/**
* 消息头信息
*/
......@@ -36,15 +42,22 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
this.key = msg.getKey();
this.data = msg.getData();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
msg.getHeaders().getData().forEach(headers::put);
msg.getHeaders().getData().entrySet().stream().forEach(item->
headers.put(item.getKey(),item.getValue()));
this.headers = headers;
}
public static void main(String[] args) {
// DefaultTbQueueMsg defaultTbQueueMsg = new DefaultTbQueueMsg(UUID.randomUUID(),"string".getBytes(),null);
// TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
// header.put(MessageHeader.CLIENTID, "abcd1234");
// header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
// header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd");
// TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "abcd1234".getBytes() , header);
//
// String ret = JSON.toJSONString(defaultTbQueueMsg);
// String ret = JSON.toJSONString(queueMsg);
// System.out.println("pro:"+ret);
//
// DefaultTbQueueMsg qu = JSON.parseObject(ret, DefaultTbQueueMsg.class);
......
package com.mortals.xhx.base.system.message;
import com.mortals.xhx.queue.TbQueueCallback;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo;
public interface MessageService {
/**
* 发送消息
*
* @param info
* @param header
* @param message
* @param callback
*/
void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback);
}
\ No newline at end of file
package com.mortals.xhx.base.system.message.impl;
import cn.hutool.core.util.IdUtil;
import com.mortals.framework.model.Context;
import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.dao.DeviceDao;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.List;
/**
* DeviceService
* 设备 service实现
*
* @author zxfei
* @date 2022-03-09
*/
@Service("messageService")
@CommonsLog
public class MessageServiceImpl implements MessageService {
@Autowired
private TbCoreQueueProducerProvider producerProvider;
@Override
public void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
producer.send(info, queueMsg, callback);
}
}
\ No newline at end of file
package com.mortals.xhx.busiz.web;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.Sign;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.base.DeviceAuth;
import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.req.DeviceReq;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.busiz.rsp.DeviceResp;
import com.mortals.xhx.busiz.security.DeviceTokenService;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
......@@ -118,8 +128,12 @@ public class DeviceApiController {
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), Constant.UPLOAD_TOPIC, JSON.toJSONString(req), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT);
header.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.MD5withRSA).sign(header.get(MessageHeader.TIMESTAMP).getBytes())));
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
}
} catch (Exception e) {
log.error("接收数据失败", e);
......
###流程根据id启动
POST {{baseUrl}}/m/api/flow/process/start
Content-Type: application/json
{
"processDefinitionKey": "process_xw8yhk6g",
"businessKey": "12321",
"creator": "admin",
"formName": "会签流程测试",
"platformSn": "government-manager",
"userCode": "admin",
"variables": {
"assignee1": "admin",
"assignee2": "admin",
"assigneeList":[
"zhang1",
"zhang2",
"zhang3"
]
}
}
###流程 动态表达式
POST {{baseUrl}}/m/api/flow/process/start
Content-Type: application/json
{
"processDefinitionKey": "process_4zrketpm",
"businessKey": "12321",
"creator": "admin",
"formName": "动态选人流程测试",
"platformSn": "government-manager",
"userCode": "admin",
"variables": {
"approval": "admin"
}
}
###流程根据id 获取所有el表达式
POST {{baseUrl}}/m/api/flow/process/getAllProcessInstanceEl
Content-Type: application/json
{
"processDefinitionKey": "process_xw8yhk6g",
"businessKey": "12321",
"creator": "admin",
"formName": "会签流程测试",
"platformSn": "government-manager",
"userCode": "admin"
}
###流程实例 激活与挂起
POST {{baseUrl}}/m/api/flow/process/updateState
Content-Type: application/json
{
"processInstanceId": "46eb2817-0951-11ec-8011-c25bd865180b",
"suspensionState": 1
}
###流程根据实例id获取当前进程图片
POST {{baseUrl}}/m/api/flow/process/getImage
Content-Type: application/json
{
"processInstanceId": "b6eea2f5-0957-11ec-9b0c-c25bd865180b"
}
###终止流程
POST {{baseUrl}}/m/api/flow/process/stop
Content-Type: application/json
{
"processInstanceId": "b6eea2f5-0957-11ec-9b0c-c25bd865180b"
}
###删除流程
POST {{baseUrl}}/m/api/flow/process/delete
Content-Type: application/json
{
"processInstanceId": "46eb2817-0951-11ec-8011-c25bd865180b",
"deleteReason": "测试删除流程"
}
......@@ -55,8 +55,26 @@ public final class Constant {
/** 基础代码版本 Z-BASE.MANAGER-S1.0.0 */
public final static String BASEMANAGER_VERSION = "Z-BASE.MANAGER-S1.0.0";
/**
* 服务器http
*/
public final static String PARAM_SERVER_HTTP_URL = "server_http_url";
/**
* 消息类型(心跳)
*/
public static final String MESSAGETYPE_HEARTBEAT = "HEART_BEAT";
/**
* 消息类型(激活)
*/
public static final String MESSAGETYPE_ACTIVE = "ACTIVE";
/**
* 消息类型(upgread)
*/
public static final String MESSAGETYPE_UPGREAD = "UPGREAD";
}
package com.mortals.xhx.common.key;
/**
* 代码生成通用常量
*
* @author: zxfei
* @date: 2021/9/28 15:43
*/
public class GenConstants {
/**
* 单表(增删改查)
*/
public static final String TPL_CRUD = "crud";
/**
* 树表(增删改查)
*/
public static final String TPL_TREE = "tree";
/**
* 主子表(增删改查)
*/
public static final String TPL_SUB = "sub";
/**
* 主子表(一对一)
*/
public static final String TPL_SUB_ONE = "subone";
/**
* 树编码字段
*/
public static final String TREE_CODE = "treeCode";
/**
* 树父编码字段
*/
public static final String TREE_PARENT_CODE = "treeParentCode";
/**
* 树名称字段
*/
public static final String TREE_NAME = "treeName";
/**
* 上级菜单ID字段
*/
public static final String PARENT_MENU_ID = "parentMenuId";
/**
* 上级菜单名称字段
*/
public static final String PARENT_MENU_NAME = "parentMenuName";
/**
* 数据库字符串类型
*/
public static final String[] COLUMNTYPE_STR = {"char", "varchar", "nvarchar", "varchar2"};
/**
* 数据库文本类型
*/
public static final String[] COLUMNTYPE_TEXT = {"tinytext", "text", "mediumtext", "longtext"};
/**
* 数据库时间类型
*/
public static final String[] COLUMNTYPE_TIME = {"datetime", "time", "date", "timestamp"};
/**
* 数据库数字类型
*/
public static final String[] COLUMNTYPE_NUMBER = {"tinyint", "smallint", "mediumint", "int", "number", "integer",
"bit", "bigint", "float", "double", "decimal"};
/**
* 页面不需要编辑字段
*/
public static final String[] COLUMNNAME_NOT_EDIT = {"id", "createUser", "createTime", "delFlag","updateUser",
"updateTime"};
/**
* 页面不需要显示的列表字段
*/
public static final String[] COLUMNNAME_NOT_LIST = {"id", "createUser", "createTime", "delFlag", "updateUser",
"updateTime"};
/**
* 页面不需要查询字段
*/
public static final String[] COLUMNNAME_NOT_QUERY = {"id", "createUser", "createTime", "delFlag", "updateUser",
"updateTime", "remark"};
/**
* Entity基类字段
*/
public static final String[] BASE_ENTITY = {"createUserId", "createUser", "createTime", "updateUserId", "updateUser", "updateTime", "id"};
/**
* Tree基类字段
*/
public static final String[] TREE_ENTITY = {"parentName", "parentId", "orderNum", "ancestors", "children"};
/**
* 文本框
*/
public static final String HTML_INPUT = "input";
/**
* 文本域
*/
public static final String HTML_TEXTAREA = "textarea";
/**
* 下拉框
*/
public static final String HTML_SELECT = "select";
/**
* 单选框
*/
public static final String HTML_RADIO = "radio";
/**
* 复选框
*/
public static final String HTML_CHECKBOX = "checkbox";
/**
* 日期控件
*/
public static final String HTML_DATETIME = "datetime";
/**
* 图片上传控件
*/
public static final String HTML_IMAGE_UPLOAD = "imageUpload";
/**
* 文件上传控件
*/
public static final String HTML_FILE_UPLOAD = "fileUpload";
/**
* 富文本控件
*/
public static final String HTML_EDITOR = "editor";
/**
* 字符串类型
*/
public static final String TYPE_STRING = "String";
/**
* 整型
*/
public static final String TYPE_INTEGER = "Integer";
/**
* 长整型
*/
public static final String TYPE_LONG = "Long";
/**
* 浮点型
*/
public static final String TYPE_DOUBLE = "Double";
/**
* 高精度计算类型
*/
public static final String TYPE_BIGDECIMAL = "BigDecimal";
/**
* 时间类型
*/
public static final String TYPE_DATE = "Date";
/**
* 模糊查询
*/
public static final String QUERY_LIKE = "LIKE";
/**
* 需要
*/
public static final Integer REQUIRE = 1;
/**
* 不需要
*/
public static final Integer NOREQUIRE = 0;
}
......@@ -94,22 +94,22 @@ public class DemoApiController {
headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
String payLoad = "12313";
TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
for (int i = 0; i < 10; i++) {
producer.send(info, queueMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
// logger.info("消息发送成功");
}
@Override
public void onFailure(Throwable t) {
}
});
}
// TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
//
//
// for (int i = 0; i < 10; i++) {
// producer.send(info, queueMsg, new TbQueueCallback() {
// @Override
// public void onSuccess(TbQueueMsgMetadata metadata) {
// // logger.info("消息发送成功");
// }
//
// @Override
// public void onFailure(Throwable t) {
//
// }
// });
// }
} catch (Exception e) {
log.error("消息提交失败", e);
......
......@@ -3,6 +3,8 @@ import com.mortals.framework.model.Context;
import com.mortals.framework.service.ICRUDCacheService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo;
import java.util.List;
......@@ -17,9 +19,12 @@ import java.util.List;
public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
ApiResp<String> sendDeviceMessage(Long deviceId, String topic,String message , Context context);
ApiResp<String> sendDeviceMessage(DeviceEntity deviceEntity, TopicPartitionInfo info, TbQueueMsgHeaders header,String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info,TbQueueMsgHeaders header, String message , Context context);
......
......@@ -3,11 +3,14 @@ package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.util.IdUtil;
import com.mortals.framework.model.Context;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.MessageProtocolEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.device.model.DeviceQuery;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -33,65 +36,48 @@ import java.util.UUID;
@Service("deviceService")
public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, DeviceEntity, Long> implements DeviceService {
@Autowired
private TbCoreQueueProducerProvider producerProvider;
@Override
protected String getExtKey(DeviceEntity data) {
return data.getDeviceCode();
}
@Autowired
private MessageService messageService;
@Override
public ApiResp<String> sendDeviceMessage(Long deviceId, String topic, String message, Context context) {
public ApiResp<String> sendDeviceMessage(DeviceEntity deviceEntity, TopicPartitionInfo info, TbQueueMsgHeaders header, String message, Context context) {
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
//发送消息
send(topic, message, context, resp, deviceId);
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac());
}
@Override
public void onFailure(Throwable t) {
log.error("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac(), t);
}
};
messageService.send(info, header, message, callback);
return resp;
}
@Override
public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message, Context context) {
public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info, TbQueueMsgHeaders header, String message, Context context) {
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
for (Long deviceId : deviceIds) {
send(topic, message, context, resp, deviceId);
DeviceQuery deviceQuery = new DeviceQuery();
deviceQuery.setIdList(deviceIds);
List<DeviceEntity> deviceEntityList = this.find(deviceQuery);
for (DeviceEntity deviceEntity : deviceEntityList) {
resp = sendDeviceMessage(deviceEntity, info, header, message, context);
}
return resp;
}
private void send(String topic, String message, Context context, ApiResp<String> resp, Long deviceId) {
DeviceEntity deviceEntity = this.get(deviceId, context);
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(topic + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.CLIENTID, deviceEntity.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
producer.send(info, queueMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.info("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac());
}
@Override
public void onFailure(Throwable t) {
log.error("消息投递成功,设备通道编码:" + deviceEntity.getDeviceMac(), t);
}
});
} else {
log.error(String.format("设备Id查询不到设备,deviceId:%s", deviceId));
resp.setCode(ApiRespCodeEnum.FAILED.getValue());
resp.setMsg(String.format("设备Id查询不到设备,deviceId:%s", deviceId));
}
}
}
\ No newline at end of file
package com.mortals.xhx.module.device.web;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.asymmetric.SignAlgorithm;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.OrderCol;
import com.mortals.xhx.base.system.param.service.ParamService;
......@@ -9,8 +12,12 @@ import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
import com.mortals.xhx.common.code.DeviceTypeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.module.firm.model.FirmQuery;
import com.mortals.xhx.module.firm.service.FirmService;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import com.mortals.xhx.queue.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestMapping;
......@@ -93,14 +100,27 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
* 下发信息
*/
@PostMapping(value = "downMsg")
public String downMsg(@RequestParam(value = "deviceId") Long deviceId,@RequestParam(value = "content") String content) {
public String downMsg(@RequestParam(value = "deviceId") Long deviceId, @RequestParam(value = "content") String content) {
JSONObject jsonObject = new JSONObject();
Map<String, Object> model = new HashMap<>();
String busiDesc = this.getModuleDesc()+"下发设备消息" ;
String busiDesc = this.getModuleDesc() + "下发设备消息";
try {
ApiResp<String> apiResp = this.service.sendDeviceMessage(deviceId, Constant.DOWN_TOPIC, content, getContext());
if(ApiRespCodeEnum.SUCCESS.getValue()!=apiResp.getCode()){
throw new AppException("下发消息失败!");
//根据设备编码查询设备
DeviceEntity deviceEntity = this.service.get(deviceId, getContext());
if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.DOWN_TOPIC + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_UPGREAD);
JSONObject obj = new JSONObject();
obj.put("content", content);
ApiResp<String> sendDeviceMessageResp = this.service.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(obj), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
if (ApiRespCodeEnum.SUCCESS.getValue() != sendDeviceMessageResp.getCode()) {
throw new AppException("下发消息失败!");
}
} else {
throw new AppException("设备不存在!deviceId:" + deviceId);
}
this.init(request, response, null, model, getContext());
recordSysLog(request, busiDesc + " 【成功】");
......
package com.mortals.httpclient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
public class UDPClientApp {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workGroup = new NioEventLoopGroup();
bootstrap.group(workGroup).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
// TODO Auto-generated method stub
ch.pipeline().addLast(new UDPClientHandler());
}
});
try {
Channel channel = bootstrap.bind(0).sync().channel();
channel.closeFuture().sync().await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
package com.mortals.httpclient;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
public class UDPClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
throws Exception {
ByteBuf buf = msg.content();
int len = buf.readableBytes();
byte[] data = new byte[len];
buf.readBytes(data);
String receive = new String(data, "UTF-8");
System.out.println("client->" + receive);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("hello,server", Charset.forName("UTF-8")),
new InetSocketAddress("192.168.0.98", 54321)));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment