From d98b7399c7b7f69ece8c2ecaa7c6904d7450de7d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=B5=B5=E5=95=B8=E9=9D=9E?= <13281114856@qq.com>
Date: Thu, 14 Apr 2022 15:31:34 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B6=88=E6=81=AF=E7=BB=84?=
 =?UTF-8?q?=E4=BB=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../xhx/common/code/MessageProtocolEnum.java  |  64 ++++++
 .../model/DefaultTbQueueMsgHeaders.java       |   7 +
 .../xhx/common/model/MessageHeader.java       |  24 ++-
 .../mortals/xhx/queue/DefaultTbQueueMsg.java  |  21 +-
 .../base/system/message/MessageService.java   |  21 ++
 .../message/impl/MessageServiceImpl.java      |  45 ++++
 .../xhx/busiz/web/DeviceApiController.java    |  18 +-
 .../busiz/web/RequestProcessController.http   |  94 ---------
 .../com/mortals/xhx/common/key/Constant.java  |  18 ++
 .../mortals/xhx/common/key/GenConstants.java  | 193 ------------------
 .../mortals/xhx/daemon/DemoApiController.java |  32 +--
 .../module/device/service/DeviceService.java  |   9 +-
 .../service/impl/DeviceServiceImpl.java       |  68 +++---
 .../module/device/web/DeviceController.java   |  30 ++-
 .../com/mortals/httpclient/UDPClientApp.java  |  32 +++
 .../mortals/httpclient/UDPClientHandler.java  |  31 +++
 16 files changed, 342 insertions(+), 365 deletions(-)
 create mode 100644 common-lib/src/main/java/com/mortals/xhx/common/code/MessageProtocolEnum.java
 create mode 100644 device-manager/src/main/java/com/mortals/xhx/base/system/message/MessageService.java
 create mode 100644 device-manager/src/main/java/com/mortals/xhx/base/system/message/impl/MessageServiceImpl.java
 delete mode 100644 device-manager/src/main/java/com/mortals/xhx/busiz/web/RequestProcessController.http
 delete mode 100644 device-manager/src/main/java/com/mortals/xhx/common/key/GenConstants.java
 create mode 100644 device-manager/src/test/java/com/mortals/httpclient/UDPClientApp.java
 create mode 100644 device-manager/src/test/java/com/mortals/httpclient/UDPClientHandler.java

diff --git a/common-lib/src/main/java/com/mortals/xhx/common/code/MessageProtocolEnum.java b/common-lib/src/main/java/com/mortals/xhx/common/code/MessageProtocolEnum.java
new file mode 100644
index 00000000..57d61323
--- /dev/null
+++ b/common-lib/src/main/java/com/mortals/xhx/common/code/MessageProtocolEnum.java
@@ -0,0 +1,64 @@
+package com.mortals.xhx.common.code;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ *
+ *
+ * @author zxfei
+ */
+public enum MessageProtocolEnum {
+    JSON("json", "鏈縺娲�");
+    private String value;
+    private String desc;
+
+    MessageProtocolEnum(String value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    public String getValue() {
+        return this.value;
+    }
+
+    public String getDesc() {
+        return this.desc;
+    }
+
+    public static MessageProtocolEnum getByValue(String value) {
+        for (MessageProtocolEnum activeEnum : MessageProtocolEnum.values()) {
+            if (value.equals(activeEnum.getValue())) {
+                return activeEnum;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 鑾峰彇Map闆嗗悎
+     *
+     * @param eItem 涓嶅寘鍚」
+     * @return
+     */
+    public static Map<String, String> getEnumMap(String... eItem) {
+        Map<String, String> resultMap = new LinkedHashMap<>();
+        for (MessageProtocolEnum item : MessageProtocolEnum.values()) {
+            try {
+                boolean hasE = false;
+                for (String e : eItem) {
+                    if (item.getValue() == e) {
+                        hasE = true;
+                        break;
+                    }
+                }
+                if (!hasE) {
+                    resultMap.put(item.getValue(), item.getDesc());
+                }
+            } catch (Exception ex) {
+
+            }
+        }
+        return resultMap;
+    }
+}
\ No newline at end of file
diff --git a/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java b/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
index bd2bbab5..3dce3cac 100644
--- a/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
+++ b/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
@@ -1,5 +1,9 @@
 package com.mortals.xhx.common.model;
 
+import cn.hutool.crypto.SecureUtil;
+import cn.hutool.crypto.asymmetric.SignAlgorithm;
+import com.mortals.framework.util.DateUtils;
+import com.mortals.xhx.common.code.MessageProtocolEnum;
 import com.mortals.xhx.queue.TbQueueMsgHeaders;
 import lombok.Setter;
 
@@ -18,6 +22,9 @@ public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
     protected  Map<String, String> data = new HashMap<>();
 
     public DefaultTbQueueMsgHeaders() {
+        data.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
+        data.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.MD5withRSA).sign(data.get(MessageHeader.TIMESTAMP).getBytes())));
+        data.put(MessageHeader.MESSAGEPROTOCOL, MessageProtocolEnum.JSON.getValue());
         data.put(MessageHeader.TOPIC, "");
         data.put(MessageHeader.QOS, "0");
 
diff --git a/common-lib/src/main/java/com/mortals/xhx/common/model/MessageHeader.java b/common-lib/src/main/java/com/mortals/xhx/common/model/MessageHeader.java
index 435c0961..9e7b5b8b 100644
--- a/common-lib/src/main/java/com/mortals/xhx/common/model/MessageHeader.java
+++ b/common-lib/src/main/java/com/mortals/xhx/common/model/MessageHeader.java
@@ -12,15 +12,29 @@ public class MessageHeader {
      * 瀹㈡埛id
      */
     public static final String CLIENTID = "clientId";
+    /**
+     * 鍗忚
+     */
     public static final String MESSAGEPROTOCOL = "protocol";
+    /**
+     * 鏃堕棿鎴�
+     */
     public static final String TIMESTAMP = "timestamp";
-
+    /**
+     * 娑堟伅绛惧悕
+     */
+    public static final String MESSAGESIGN = "sign";
+    /**
+     * 娑堟伅绫诲瀷
+     */
     public static final String MESSAGETYPE = "messageType";
     /**
      * topic
      */
     public static final String TOPIC = "topic";
-
+    /**
+     * 娑堟伅绛夌骇
+     */
     public static final String QOS = "qos";
 
     public static final String RETAIN = "retain";
@@ -29,10 +43,4 @@ public class MessageHeader {
 
     public static final String DUP = "dup";
 
-
-    public static final String productKey = "dup";
-   // public static final String productKey = "dup";
-
-
-
 }
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java b/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
index 38a88dff..f0a69b53 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
@@ -1,12 +1,16 @@
 package com.mortals.xhx.queue;
 
+import cn.hutool.core.util.IdUtil;
 import com.alibaba.fastjson.JSON;
+import com.mortals.framework.util.DateUtils;
 import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
+import com.mortals.xhx.common.model.MessageHeader;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.json.JSONObject;
 
+import java.util.HashMap;
 import java.util.UUID;
 
 /**
@@ -27,6 +31,8 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
      * 鏁版嵁杞戒綋
      */
     private  byte[] data;
+
+
     /**
      * 娑堟伅澶翠俊鎭�
      */
@@ -36,15 +42,22 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
         this.key = msg.getKey();
         this.data = msg.getData();
         TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
-        msg.getHeaders().getData().forEach(headers::put);
+
+        msg.getHeaders().getData().entrySet().stream().forEach(item->
+            headers.put(item.getKey(),item.getValue()));
+
         this.headers = headers;
     }
 
-
     public static void main(String[] args) {
-//        DefaultTbQueueMsg defaultTbQueueMsg = new DefaultTbQueueMsg(UUID.randomUUID(),"string".getBytes(),null);
+
+//        TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
+//        header.put(MessageHeader.CLIENTID, "abcd1234");
+//        header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
+//        header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd");
+//        TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "abcd1234".getBytes() , header);
 //
-//        String ret = JSON.toJSONString(defaultTbQueueMsg);
+//        String ret = JSON.toJSONString(queueMsg);
 //        System.out.println("pro:"+ret);
 //
 //        DefaultTbQueueMsg qu = JSON.parseObject(ret, DefaultTbQueueMsg.class);
diff --git a/device-manager/src/main/java/com/mortals/xhx/base/system/message/MessageService.java b/device-manager/src/main/java/com/mortals/xhx/base/system/message/MessageService.java
new file mode 100644
index 00000000..b9dbbabd
--- /dev/null
+++ b/device-manager/src/main/java/com/mortals/xhx/base/system/message/MessageService.java
@@ -0,0 +1,21 @@
+package com.mortals.xhx.base.system.message;
+
+import com.mortals.xhx.queue.TbQueueCallback;
+import com.mortals.xhx.queue.TbQueueMsgHeaders;
+import com.mortals.xhx.queue.TopicPartitionInfo;
+
+
+public interface MessageService {
+
+
+    /**
+     * 鍙戦€佹秷鎭�
+     *
+     * @param info
+     * @param header
+     * @param message
+     * @param callback
+     */
+    void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback);
+
+}
\ No newline at end of file
diff --git a/device-manager/src/main/java/com/mortals/xhx/base/system/message/impl/MessageServiceImpl.java b/device-manager/src/main/java/com/mortals/xhx/base/system/message/impl/MessageServiceImpl.java
new file mode 100644
index 00000000..295ec9bb
--- /dev/null
+++ b/device-manager/src/main/java/com/mortals/xhx/base/system/message/impl/MessageServiceImpl.java
@@ -0,0 +1,45 @@
+package com.mortals.xhx.base.system.message.impl;
+
+import cn.hutool.core.util.IdUtil;
+import com.mortals.framework.model.Context;
+import com.mortals.framework.service.impl.AbstractCRUDCacheServiceImpl;
+import com.mortals.framework.util.DateUtils;
+import com.mortals.xhx.base.system.message.MessageService;
+import com.mortals.xhx.busiz.rsp.ApiResp;
+import com.mortals.xhx.common.code.ApiRespCodeEnum;
+import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
+import com.mortals.xhx.common.model.MessageHeader;
+import com.mortals.xhx.module.device.dao.DeviceDao;
+import com.mortals.xhx.module.device.model.DeviceEntity;
+import com.mortals.xhx.module.device.service.DeviceService;
+import com.mortals.xhx.queue.*;
+import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
+import lombok.extern.apachecommons.CommonsLog;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+
+/**
+ * DeviceService
+ * 璁惧 service瀹炵幇
+ *
+ * @author zxfei
+ * @date 2022-03-09
+ */
+@Service("messageService")
+@CommonsLog
+public class MessageServiceImpl implements MessageService {
+
+    @Autowired
+    private TbCoreQueueProducerProvider producerProvider;
+
+    @Override
+    public void send(TopicPartitionInfo info, TbQueueMsgHeaders header, String message, TbQueueCallback callback) {
+        TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
+        TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
+        producer.send(info, queueMsg, callback);
+    }
+
+}
\ No newline at end of file
diff --git a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
index ea610971..7a41a1f6 100644
--- a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
@@ -1,19 +1,29 @@
 package com.mortals.xhx.busiz.web;
 
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.SecureUtil;
+import cn.hutool.crypto.asymmetric.Sign;
+import cn.hutool.crypto.asymmetric.SignAlgorithm;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.mortals.framework.util.DateUtils;
 import com.mortals.xhx.base.DeviceAuth;
 import com.mortals.xhx.base.framework.ws.message.SendToAllRequest;
 import com.mortals.xhx.base.framework.ws.util.WebSocketUtil;
+import com.mortals.xhx.base.system.message.MessageService;
 import com.mortals.xhx.busiz.req.DeviceReq;
 import com.mortals.xhx.busiz.rsp.ApiResp;
 import com.mortals.xhx.busiz.rsp.DeviceResp;
 import com.mortals.xhx.busiz.security.DeviceTokenService;
 import com.mortals.xhx.common.code.*;
 import com.mortals.xhx.common.key.Constant;
+import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
+import com.mortals.xhx.common.model.MessageHeader;
 import com.mortals.xhx.module.device.model.DeviceEntity;
 import com.mortals.xhx.module.device.service.DeviceLogService;
 import com.mortals.xhx.module.device.service.DeviceService;
+import com.mortals.xhx.queue.TbQueueMsgHeaders;
+import com.mortals.xhx.queue.TopicPartitionInfo;
 import lombok.extern.apachecommons.CommonsLog;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.ObjectUtils;
@@ -118,8 +128,12 @@ public class DeviceApiController {
             DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
             if (!ObjectUtils.isEmpty(deviceEntity)) {
                 //灏嗕笂鎶ヤ俊鎭浆鍙戝埌mq涓�
-                ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), Constant.UPLOAD_TOPIC, JSON.toJSONString(req), null);
-            log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
+                TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
+                TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
+                header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_HEARTBEAT);
+                header.put(MessageHeader.MESSAGESIGN, new String(SecureUtil.sign(SignAlgorithm.MD5withRSA).sign(header.get(MessageHeader.TIMESTAMP).getBytes())));
+                ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(req), null);
+                log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
             }
         } catch (Exception e) {
             log.error("鎺ユ敹鏁版嵁澶辫触", e);
diff --git a/device-manager/src/main/java/com/mortals/xhx/busiz/web/RequestProcessController.http b/device-manager/src/main/java/com/mortals/xhx/busiz/web/RequestProcessController.http
deleted file mode 100644
index 51919815..00000000
--- a/device-manager/src/main/java/com/mortals/xhx/busiz/web/RequestProcessController.http
+++ /dev/null
@@ -1,94 +0,0 @@
-###娴佺▼鏍规嵁id鍚姩
-POST {{baseUrl}}/m/api/flow/process/start
-Content-Type: application/json
-
-{
-  "processDefinitionKey": "process_xw8yhk6g",
-  "businessKey": "12321",
-  "creator": "admin",
-  "formName": "浼氱娴佺▼娴嬭瘯",
-  "platformSn": "government-manager",
-  "userCode": "admin",
-  "variables": {
-    "assignee1": "admin",
-    "assignee2": "admin",
-    "assigneeList":[
-      "zhang1",
-      "zhang2",
-      "zhang3"
-    ]
-
-  }
-}
-
-
-###娴佺▼ 鍔ㄦ€佽〃杈惧紡
-POST {{baseUrl}}/m/api/flow/process/start
-Content-Type: application/json
-
-{
-  "processDefinitionKey": "process_4zrketpm",
-  "businessKey": "12321",
-  "creator": "admin",
-  "formName": "鍔ㄦ€侀€変汉娴佺▼娴嬭瘯",
-  "platformSn": "government-manager",
-  "userCode": "admin",
-  "variables": {
-    "approval": "admin"
-  }
-}
-
-
-###娴佺▼鏍规嵁id 鑾峰彇鎵€鏈塭l琛ㄨ揪寮�
-POST {{baseUrl}}/m/api/flow/process/getAllProcessInstanceEl
-Content-Type: application/json
-
-{
-  "processDefinitionKey": "process_xw8yhk6g",
-  "businessKey": "12321",
-  "creator": "admin",
-  "formName": "浼氱娴佺▼娴嬭瘯",
-  "platformSn": "government-manager",
-  "userCode": "admin"
-
-}
-
-
-###娴佺▼瀹炰緥 婵€娲讳笌鎸傝捣
-POST {{baseUrl}}/m/api/flow/process/updateState
-Content-Type: application/json
-
-{
-  "processInstanceId": "46eb2817-0951-11ec-8011-c25bd865180b",
-  "suspensionState": 1
-}
-
-
-###娴佺▼鏍规嵁瀹炰緥id鑾峰彇褰撳墠杩涚▼鍥剧墖
-POST {{baseUrl}}/m/api/flow/process/getImage
-Content-Type: application/json
-
-{
-  "processInstanceId": "b6eea2f5-0957-11ec-9b0c-c25bd865180b"
-}
-
-
-###缁堟娴佺▼
-POST {{baseUrl}}/m/api/flow/process/stop
-Content-Type: application/json
-
-{
-  "processInstanceId": "b6eea2f5-0957-11ec-9b0c-c25bd865180b"
-}
-
-###鍒犻櫎娴佺▼
-POST {{baseUrl}}/m/api/flow/process/delete
-Content-Type: application/json
-
-{
-  "processInstanceId": "46eb2817-0951-11ec-8011-c25bd865180b",
-  "deleteReason": "娴嬭瘯鍒犻櫎娴佺▼"
-}
-
-
-
diff --git a/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java b/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
index ef9c4a53..1d4bf9b6 100644
--- a/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
+++ b/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
@@ -55,8 +55,26 @@ public final class Constant {
 	/** 鍩虹浠g爜鐗堟湰 Z-BASE.MANAGER-S1.0.0 */
 	public final static String BASEMANAGER_VERSION = "Z-BASE.MANAGER-S1.0.0";
 
+	/**
+	 * 鏈嶅姟鍣╤ttp
+	 */
 	public final static String PARAM_SERVER_HTTP_URL = "server_http_url";
 
 
+	/**
+	 * 娑堟伅绫诲瀷(蹇冭烦)
+	 */
+	public static final String MESSAGETYPE_HEARTBEAT = "HEART_BEAT";
+
+
+	/**
+	 * 娑堟伅绫诲瀷(婵€娲�)
+	 */
+	public static final String MESSAGETYPE_ACTIVE = "ACTIVE";
+
+	/**
+	 * 娑堟伅绫诲瀷(upgread)
+	 */
+	public static final String MESSAGETYPE_UPGREAD = "UPGREAD";
 
 }
diff --git a/device-manager/src/main/java/com/mortals/xhx/common/key/GenConstants.java b/device-manager/src/main/java/com/mortals/xhx/common/key/GenConstants.java
deleted file mode 100644
index 89d81dc0..00000000
--- a/device-manager/src/main/java/com/mortals/xhx/common/key/GenConstants.java
+++ /dev/null
@@ -1,193 +0,0 @@
-package com.mortals.xhx.common.key;
-
-/**
- * 浠g爜鐢熸垚閫氱敤甯搁噺
- *
- * @author: zxfei
- * @date: 2021/9/28 15:43
- */
-public class GenConstants {
-    /**
-     * 鍗曡〃锛堝鍒犳敼鏌ワ級
-     */
-    public static final String TPL_CRUD = "crud";
-
-    /**
-     * 鏍戣〃锛堝鍒犳敼鏌ワ級
-     */
-    public static final String TPL_TREE = "tree";
-
-    /**
-     * 涓诲瓙琛紙澧炲垹鏀规煡锛�
-     */
-    public static final String TPL_SUB = "sub";
-
-    /**
-     * 涓诲瓙琛紙涓€瀵逛竴锛�
-     */
-    public static final String TPL_SUB_ONE = "subone";
-
-    /**
-     * 鏍戠紪鐮佸瓧娈�
-     */
-    public static final String TREE_CODE = "treeCode";
-
-    /**
-     * 鏍戠埗缂栫爜瀛楁
-     */
-    public static final String TREE_PARENT_CODE = "treeParentCode";
-
-    /**
-     * 鏍戝悕绉板瓧娈�
-     */
-    public static final String TREE_NAME = "treeName";
-
-    /**
-     * 涓婄骇鑿滃崟ID瀛楁
-     */
-    public static final String PARENT_MENU_ID = "parentMenuId";
-
-    /**
-     * 涓婄骇鑿滃崟鍚嶇О瀛楁
-     */
-    public static final String PARENT_MENU_NAME = "parentMenuName";
-
-    /**
-     * 鏁版嵁搴撳瓧绗︿覆绫诲瀷
-     */
-    public static final String[] COLUMNTYPE_STR = {"char", "varchar", "nvarchar", "varchar2"};
-
-    /**
-     * 鏁版嵁搴撴枃鏈被鍨�
-     */
-    public static final String[] COLUMNTYPE_TEXT = {"tinytext", "text", "mediumtext", "longtext"};
-
-    /**
-     * 鏁版嵁搴撴椂闂寸被鍨�
-     */
-    public static final String[] COLUMNTYPE_TIME = {"datetime", "time", "date", "timestamp"};
-
-    /**
-     * 鏁版嵁搴撴暟瀛楃被鍨�
-     */
-    public static final String[] COLUMNTYPE_NUMBER = {"tinyint", "smallint", "mediumint", "int", "number", "integer",
-            "bit", "bigint", "float", "double", "decimal"};
-
-    /**
-     * 椤甸潰涓嶉渶瑕佺紪杈戝瓧娈�
-     */
-    public static final String[] COLUMNNAME_NOT_EDIT = {"id", "createUser", "createTime", "delFlag","updateUser",
-            "updateTime"};
-
-    /**
-     * 椤甸潰涓嶉渶瑕佹樉绀虹殑鍒楄〃瀛楁
-     */
-    public static final String[] COLUMNNAME_NOT_LIST = {"id", "createUser", "createTime", "delFlag", "updateUser",
-            "updateTime"};
-
-    /**
-     * 椤甸潰涓嶉渶瑕佹煡璇㈠瓧娈�
-     */
-    public static final String[] COLUMNNAME_NOT_QUERY = {"id", "createUser", "createTime", "delFlag", "updateUser",
-            "updateTime", "remark"};
-
-    /**
-     * Entity鍩虹被瀛楁
-     */
-    public static final String[] BASE_ENTITY = {"createUserId", "createUser", "createTime", "updateUserId", "updateUser", "updateTime", "id"};
-
-    /**
-     * Tree鍩虹被瀛楁
-     */
-    public static final String[] TREE_ENTITY = {"parentName", "parentId", "orderNum", "ancestors", "children"};
-
-    /**
-     * 鏂囨湰妗�
-     */
-    public static final String HTML_INPUT = "input";
-
-    /**
-     * 鏂囨湰鍩�
-     */
-    public static final String HTML_TEXTAREA = "textarea";
-
-    /**
-     * 涓嬫媺妗�
-     */
-    public static final String HTML_SELECT = "select";
-
-    /**
-     * 鍗曢€夋
-     */
-    public static final String HTML_RADIO = "radio";
-
-    /**
-     * 澶嶉€夋
-     */
-    public static final String HTML_CHECKBOX = "checkbox";
-
-    /**
-     * 鏃ユ湡鎺т欢
-     */
-    public static final String HTML_DATETIME = "datetime";
-
-    /**
-     * 鍥剧墖涓婁紶鎺т欢
-     */
-    public static final String HTML_IMAGE_UPLOAD = "imageUpload";
-
-    /**
-     * 鏂囦欢涓婁紶鎺т欢
-     */
-    public static final String HTML_FILE_UPLOAD = "fileUpload";
-
-    /**
-     * 瀵屾枃鏈帶浠�
-     */
-    public static final String HTML_EDITOR = "editor";
-
-    /**
-     * 瀛楃涓茬被鍨�
-     */
-    public static final String TYPE_STRING = "String";
-
-    /**
-     * 鏁村瀷
-     */
-    public static final String TYPE_INTEGER = "Integer";
-
-    /**
-     * 闀挎暣鍨�
-     */
-    public static final String TYPE_LONG = "Long";
-
-    /**
-     * 娴偣鍨�
-     */
-    public static final String TYPE_DOUBLE = "Double";
-
-    /**
-     * 楂樼簿搴﹁绠楃被鍨�
-     */
-    public static final String TYPE_BIGDECIMAL = "BigDecimal";
-
-    /**
-     * 鏃堕棿绫诲瀷
-     */
-    public static final String TYPE_DATE = "Date";
-
-    /**
-     * 妯$硦鏌ヨ
-     */
-    public static final String QUERY_LIKE = "LIKE";
-
-    /**
-     * 闇€瑕�
-     */
-    public static final Integer REQUIRE = 1;
-
-    /**
-     * 涓嶉渶瑕�
-     */
-    public static final Integer NOREQUIRE = 0;
-}
diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java b/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
index 4cb30571..6ae845ae 100644
--- a/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
@@ -94,22 +94,22 @@ public class DemoApiController {
             headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
             String payLoad = "12313";
 
-            TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
-
-
-            for (int i = 0; i < 10; i++) {
-                producer.send(info, queueMsg, new TbQueueCallback() {
-                    @Override
-                    public void onSuccess(TbQueueMsgMetadata metadata) {
-                        // logger.info("娑堟伅鍙戦€佹垚鍔�");
-                    }
-
-                    @Override
-                    public void onFailure(Throwable t) {
-
-                    }
-                });
-            }
+//            TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
+//
+//
+//            for (int i = 0; i < 10; i++) {
+//                producer.send(info, queueMsg, new TbQueueCallback() {
+//                    @Override
+//                    public void onSuccess(TbQueueMsgMetadata metadata) {
+//                        // logger.info("娑堟伅鍙戦€佹垚鍔�");
+//                    }
+//
+//                    @Override
+//                    public void onFailure(Throwable t) {
+//
+//                    }
+//                });
+//            }
 
         } catch (Exception e) {
             log.error("娑堟伅鎻愪氦澶辫触", e);
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java b/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
index 430e3e87..1fb32c57 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
@@ -3,6 +3,8 @@ import com.mortals.framework.model.Context;
 import com.mortals.framework.service.ICRUDCacheService;
 import com.mortals.xhx.busiz.rsp.ApiResp;
 import com.mortals.xhx.module.device.model.DeviceEntity;
+import com.mortals.xhx.queue.TbQueueMsgHeaders;
+import com.mortals.xhx.queue.TopicPartitionInfo;
 
 import java.util.List;
 
@@ -17,9 +19,12 @@ import java.util.List;
 public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
 
 
-    ApiResp<String> sendDeviceMessage(Long deviceId, String topic,String message , Context context);
+    ApiResp<String> sendDeviceMessage(DeviceEntity deviceEntity, TopicPartitionInfo info, TbQueueMsgHeaders header,String message , Context context);
 
-    ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message , Context context);
+
+
+
+    ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info,TbQueueMsgHeaders header, String message , Context context);
 
 
 
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java b/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
index de9a60ea..be0aa84c 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
@@ -3,11 +3,14 @@ package com.mortals.xhx.module.device.service.impl;
 import cn.hutool.core.util.IdUtil;
 import com.mortals.framework.model.Context;
 import com.mortals.framework.util.DateUtils;
+import com.mortals.xhx.base.system.message.MessageService;
 import com.mortals.xhx.busiz.rsp.ApiResp;
 import com.mortals.xhx.common.code.ApiRespCodeEnum;
+import com.mortals.xhx.common.code.MessageProtocolEnum;
 import com.mortals.xhx.common.key.Constant;
 import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
 import com.mortals.xhx.common.model.MessageHeader;
+import com.mortals.xhx.module.device.model.DeviceQuery;
 import com.mortals.xhx.queue.*;
 import com.mortals.xhx.queue.provider.TbCoreQueueProducerProvider;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -33,65 +36,48 @@ import java.util.UUID;
 @Service("deviceService")
 public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, DeviceEntity, Long> implements DeviceService {
 
-    @Autowired
-    private TbCoreQueueProducerProvider producerProvider;
-
     @Override
     protected String getExtKey(DeviceEntity data) {
         return data.getDeviceCode();
     }
 
+    @Autowired
+    private MessageService messageService;
 
     @Override
-    public ApiResp<String> sendDeviceMessage(Long deviceId, String topic, String message, Context context) {
+    public ApiResp<String> sendDeviceMessage(DeviceEntity deviceEntity, TopicPartitionInfo info, TbQueueMsgHeaders header, String message, Context context) {
         ApiResp<String> resp = new ApiResp<>();
         resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
         resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
-        //鍙戦€佹秷鎭�
-        send(topic, message, context, resp, deviceId);
+
+        TbQueueCallback callback = new TbQueueCallback() {
+            @Override
+            public void onSuccess(TbQueueMsgMetadata metadata) {
+                log.info("娑堟伅鎶曢€掓垚鍔�,璁惧閫氶亾缂栫爜:" + deviceEntity.getDeviceMac());
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.error("娑堟伅鎶曢€掓垚鍔�,璁惧閫氶亾缂栫爜:" + deviceEntity.getDeviceMac(), t);
+
+            }
+        };
+        messageService.send(info, header, message, callback);
+
         return resp;
     }
 
     @Override
-    public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message, Context context) {
+    public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, TopicPartitionInfo info, TbQueueMsgHeaders header, String message, Context context) {
         ApiResp<String> resp = new ApiResp<>();
-        resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
-        resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
-
-        for (Long deviceId : deviceIds) {
-            send(topic, message, context, resp, deviceId);
+        DeviceQuery deviceQuery = new DeviceQuery();
+        deviceQuery.setIdList(deviceIds);
+        List<DeviceEntity> deviceEntityList = this.find(deviceQuery);
+        for (DeviceEntity deviceEntity : deviceEntityList) {
+            resp = sendDeviceMessage(deviceEntity, info, header, message, context);
         }
         return resp;
     }
 
-    private void send(String topic, String message, Context context, ApiResp<String> resp, Long deviceId) {
-        DeviceEntity deviceEntity = this.get(deviceId, context);
-        if (!ObjectUtils.isEmpty(deviceEntity)) {
-            TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
-            TopicPartitionInfo info = TopicPartitionInfo.builder().topic(topic + deviceEntity.getDeviceMac()).build();
-            TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
-            header.put(MessageHeader.CLIENTID, deviceEntity.getDeviceCode());
-            header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
-            TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
-
-            producer.send(info, queueMsg, new TbQueueCallback() {
-                @Override
-                public void onSuccess(TbQueueMsgMetadata metadata) {
-                    log.info("娑堟伅鎶曢€掓垚鍔�,璁惧閫氶亾缂栫爜:" + deviceEntity.getDeviceMac());
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                    log.error("娑堟伅鎶曢€掓垚鍔�,璁惧閫氶亾缂栫爜:" + deviceEntity.getDeviceMac(), t);
-
-                }
-            });
-        } else {
-            log.error(String.format("璁惧Id鏌ヨ涓嶅埌璁惧锛宒eviceId:%s", deviceId));
-            resp.setCode(ApiRespCodeEnum.FAILED.getValue());
-            resp.setMsg(String.format("璁惧Id鏌ヨ涓嶅埌璁惧锛宒eviceId:%s", deviceId));
-        }
-    }
-
 
 }
\ No newline at end of file
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java b/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
index b69096b2..11a42eb3 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
@@ -1,5 +1,8 @@
 package com.mortals.xhx.module.device.web;
 
+import cn.hutool.crypto.SecureUtil;
+import cn.hutool.crypto.asymmetric.SignAlgorithm;
+import com.alibaba.fastjson.JSON;
 import com.mortals.framework.exception.AppException;
 import com.mortals.framework.model.OrderCol;
 import com.mortals.xhx.base.system.param.service.ParamService;
@@ -9,8 +12,12 @@ import com.mortals.xhx.common.code.ApiRespCodeEnum;
 import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
 import com.mortals.xhx.common.code.DeviceTypeEnum;
 import com.mortals.xhx.common.key.Constant;
+import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
+import com.mortals.xhx.common.model.MessageHeader;
 import com.mortals.xhx.module.firm.model.FirmQuery;
 import com.mortals.xhx.module.firm.service.FirmService;
+import com.mortals.xhx.queue.TbQueueMsgHeaders;
+import com.mortals.xhx.queue.TopicPartitionInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -93,14 +100,27 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
      * 涓嬪彂淇℃伅
      */
     @PostMapping(value = "downMsg")
-    public String downMsg(@RequestParam(value = "deviceId") Long deviceId,@RequestParam(value = "content") String content) {
+    public String downMsg(@RequestParam(value = "deviceId") Long deviceId, @RequestParam(value = "content") String content) {
         JSONObject jsonObject = new JSONObject();
         Map<String, Object> model = new HashMap<>();
-        String busiDesc = this.getModuleDesc()+"涓嬪彂璁惧娑堟伅" ;
+        String busiDesc = this.getModuleDesc() + "涓嬪彂璁惧娑堟伅";
         try {
-            ApiResp<String> apiResp = this.service.sendDeviceMessage(deviceId, Constant.DOWN_TOPIC, content, getContext());
-            if(ApiRespCodeEnum.SUCCESS.getValue()!=apiResp.getCode()){
-                throw new AppException("涓嬪彂娑堟伅澶辫触锛�");
+            //鏍规嵁璁惧缂栫爜鏌ヨ璁惧
+            DeviceEntity deviceEntity = this.service.get(deviceId, getContext());
+            if (!ObjectUtils.isEmpty(deviceEntity)) {
+                //灏嗕笂鎶ヤ俊鎭浆鍙戝埌mq涓�
+                TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.DOWN_TOPIC + deviceEntity.getDeviceMac()).build();
+                TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
+                header.put(MessageHeader.MESSAGETYPE, Constant.MESSAGETYPE_UPGREAD);
+                JSONObject obj = new JSONObject();
+                obj.put("content", content);
+                ApiResp<String> sendDeviceMessageResp = this.service.sendDeviceMessage(deviceEntity, info, header, JSON.toJSONString(obj), null);
+                log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
+                if (ApiRespCodeEnum.SUCCESS.getValue() != sendDeviceMessageResp.getCode()) {
+                    throw new AppException("涓嬪彂娑堟伅澶辫触锛�");
+                }
+            } else {
+                throw new AppException("璁惧涓嶅瓨鍦紒deviceId:" + deviceId);
             }
             this.init(request, response, null, model, getContext());
             recordSysLog(request, busiDesc + " 銆愭垚鍔熴€�");
diff --git a/device-manager/src/test/java/com/mortals/httpclient/UDPClientApp.java b/device-manager/src/test/java/com/mortals/httpclient/UDPClientApp.java
new file mode 100644
index 00000000..769c44ed
--- /dev/null
+++ b/device-manager/src/test/java/com/mortals/httpclient/UDPClientApp.java
@@ -0,0 +1,32 @@
+package com.mortals.httpclient;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+
+public class UDPClientApp {
+    public static void main(String[] args) {
+        Bootstrap bootstrap = new Bootstrap();
+        EventLoopGroup workGroup = new NioEventLoopGroup();
+        bootstrap.group(workGroup).channel(NioDatagramChannel.class)
+                .option(ChannelOption.SO_BROADCAST, true)
+                .handler(new ChannelInitializer<NioDatagramChannel>() {
+
+                    @Override
+                    protected void initChannel(NioDatagramChannel ch) throws Exception {
+                        // TODO Auto-generated method stub
+                        ch.pipeline().addLast(new UDPClientHandler());
+                    }
+                });
+        try {
+            Channel channel = bootstrap.bind(0).sync().channel();
+            channel.closeFuture().sync().await();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/device-manager/src/test/java/com/mortals/httpclient/UDPClientHandler.java b/device-manager/src/test/java/com/mortals/httpclient/UDPClientHandler.java
new file mode 100644
index 00000000..efbe20bc
--- /dev/null
+++ b/device-manager/src/test/java/com/mortals/httpclient/UDPClientHandler.java
@@ -0,0 +1,31 @@
+package com.mortals.httpclient;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+
+public class UDPClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
+            throws Exception {
+        ByteBuf buf = msg.content();
+        int len = buf.readableBytes();
+        byte[] data = new byte[len];
+        buf.readBytes(data);
+        String receive = new String(data, "UTF-8");
+        System.out.println("client->" + receive);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("hello,server", Charset.forName("UTF-8")),
+                new InetSocketAddress("192.168.0.98", 54321)));
+    }
+
+}
-- 
2.24.3