From 01ee4a0cda3d54e468561e69563e8659eac11bfd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=B5=B5=E5=95=B8=E9=9D=9E?= <13281114856@qq.com>
Date: Wed, 13 Apr 2022 10:12:25 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B6=88=E6=81=AF=E7=BB=84?=
 =?UTF-8?q?=E4=BB=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../model/DefaultTbQueueMsgHeaders.java       |  16 ++-
 .../mortals/xhx/queue/DefaultTbQueueMsg.java  |  11 +-
 .../com/mortals/xhx/queue/TbQueueMsg.java     |   2 +-
 .../mortals/xhx/queue/TbQueueMsgHeaders.java  |   8 +-
 .../xhx/queue/kafka/KafkaTbQueueMsg.java      |   8 +-
 .../queue/kafka/TbKafkaProducerTemplate.java  |   2 +-
 .../provider/RabbitMqTbCoreQueueFactory.java  |   2 +-
 .../rabbitmq/TbRabbitMqConsumerTemplate.java  |  34 ++---
 .../admin/src/views/device/list.vue           | 123 +++++++++++++-----
 .../xhx/busiz/web/DeviceApiController.java    |   3 +-
 .../com/mortals/xhx/common/key/Constant.java  |   4 +-
 .../mortals/xhx/daemon/DemoApiController.java |   8 +-
 ...a => DeviceMsgComsumerStartedService.java} |   2 +-
 .../handler/NettyUDPServerHandler.java        |   5 +-
 .../module/device/service/DeviceService.java  |   4 +-
 .../service/impl/DeviceServiceImpl.java       |  20 +--
 .../module/device/web/DeviceController.java   |  31 ++++-
 17 files changed, 189 insertions(+), 94 deletions(-)
 rename device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/{DemoStartedService.java => DeviceMsgComsumerStartedService.java} (98%)

diff --git a/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java b/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
index e6890479..bd2bbab5 100644
--- a/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
+++ b/common-lib/src/main/java/com/mortals/xhx/common/model/DefaultTbQueueMsgHeaders.java
@@ -1,6 +1,7 @@
 package com.mortals.xhx.common.model;
 
 import com.mortals.xhx.queue.TbQueueMsgHeaders;
+import lombok.Setter;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -13,26 +14,27 @@ import java.util.Map;
  */
 public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
 
-    protected final Map<String, byte[]> data = new HashMap<>();
+    @Setter
+    protected  Map<String, String> data = new HashMap<>();
 
     public DefaultTbQueueMsgHeaders() {
-        data.put(MessageHeader.TOPIC, new byte[]{});
-        data.put(MessageHeader.QOS, new byte[]{(byte) 0});
+        data.put(MessageHeader.TOPIC, "");
+        data.put(MessageHeader.QOS, "0");
 
     }
 
     @Override
-    public byte[] put(String key, byte[] value) {
-        return data.put(key, value);
+    public void put(String key, String value) {
+        data.put(key, value);
     }
 
     @Override
-    public byte[] get(String key) {
+    public String get(String key) {
         return data.get(key);
     }
 
     @Override
-    public Map<String, byte[]> getData() {
+    public Map<String, String> getData() {
         return data;
     }
 }
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java b/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
index 625c0672..38a88dff 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/DefaultTbQueueMsg.java
@@ -19,8 +19,17 @@ import java.util.UUID;
 @AllArgsConstructor
 @NoArgsConstructor
 public class DefaultTbQueueMsg implements TbQueueMsg {
-    private  UUID key;
+    /**
+     * key 鍞竴鏍囪瘑
+     */
+    private  String key;
+    /**
+     * 鏁版嵁杞戒綋
+     */
     private  byte[] data;
+    /**
+     * 娑堟伅澶翠俊鎭�
+     */
     private  TbQueueMsgHeaders headers;
 
     public DefaultTbQueueMsg(TbQueueMsg msg) {
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsg.java b/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsg.java
index af709bb4..a531cff8 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsg.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsg.java
@@ -10,7 +10,7 @@ import java.util.UUID;
  */
 public interface TbQueueMsg {
 
-    UUID getKey();
+     String getKey();
 
     TbQueueMsgHeaders getHeaders();
 
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsgHeaders.java b/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsgHeaders.java
index 9a56f84b..955a0b39 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsgHeaders.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/TbQueueMsgHeaders.java
@@ -10,9 +10,11 @@ import java.util.Map;
  */
 public interface TbQueueMsgHeaders {
 
-    byte[] put(String key, byte[] value);
+   void put(String key, String value);
 
-    byte[] get(String key);
+    String get(String key);
 
-    Map<String, byte[]> getData();
+    Map<String, String> getData();
+    
+    void  setData(Map<String, String> data);
 }
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/kafka/KafkaTbQueueMsg.java b/common-lib/src/main/java/com/mortals/xhx/queue/kafka/KafkaTbQueueMsg.java
index 2db1da75..0c1c75d6 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/kafka/KafkaTbQueueMsg.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/kafka/KafkaTbQueueMsg.java
@@ -8,22 +8,22 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import java.util.UUID;
 
 public class KafkaTbQueueMsg implements TbQueueMsg {
-    private final UUID key;
+    private final String key;
     private final TbQueueMsgHeaders headers;
     private final byte[] data;
 
     public KafkaTbQueueMsg(ConsumerRecord<String, byte[]> record) {
-        this.key = UUID.fromString(record.key());
+        this.key = record.key();
         TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
         record.headers().forEach(header -> {
-            headers.put(header.key(), header.value());
+            headers.put(header.key(), new String(header.value()));
         });
         this.headers = headers;
         this.data = record.value();
     }
 
     @Override
-    public UUID getKey() {
+    public String getKey() {
         return key;
     }
 
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/kafka/TbKafkaProducerTemplate.java b/common-lib/src/main/java/com/mortals/xhx/queue/kafka/TbKafkaProducerTemplate.java
index 2703c582..de3c5e53 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/kafka/TbKafkaProducerTemplate.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/kafka/TbKafkaProducerTemplate.java
@@ -64,7 +64,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
         if (tpi.getTopic() == null) {
             tpi.setTopic(this.defaultTopic);
         }
-        Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
+        Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue().getBytes())).collect(Collectors.toList());
         record = new ProducerRecord<>(tpi.getTopic(), null, key, data, headers);
         producer.send(record, (metadata, exception) -> {
             if (exception == null) {
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/provider/RabbitMqTbCoreQueueFactory.java b/common-lib/src/main/java/com/mortals/xhx/queue/provider/RabbitMqTbCoreQueueFactory.java
index 751ddf9d..170ec0e3 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/provider/RabbitMqTbCoreQueueFactory.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/provider/RabbitMqTbCoreQueueFactory.java
@@ -33,7 +33,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
     public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
         return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
             @Override
-            public UUID getKey() {
+            public String getKey() {
                 return msg.getKey();
             }
 
diff --git a/common-lib/src/main/java/com/mortals/xhx/queue/rabbitmq/TbRabbitMqConsumerTemplate.java b/common-lib/src/main/java/com/mortals/xhx/queue/rabbitmq/TbRabbitMqConsumerTemplate.java
index ea38a889..f830d328 100644
--- a/common-lib/src/main/java/com/mortals/xhx/queue/rabbitmq/TbRabbitMqConsumerTemplate.java
+++ b/common-lib/src/main/java/com/mortals/xhx/queue/rabbitmq/TbRabbitMqConsumerTemplate.java
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.google.gson.Gson;
 import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
+import com.mortals.xhx.common.model.MessageHeader;
 import com.mortals.xhx.queue.*;
 import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
 import com.rabbitmq.client.Channel;
@@ -105,11 +106,8 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
             DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
             // DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
             Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class);
-            msg.setKey(UUID.fromString((String)map.get("key")));
-
-
+            msg.setKey((String)map.get("key"));
             String payloadStr = (String)map.get("data");
-            byte[] payloadByte = payloadStr.getBytes();
             System.out.println("receivedPayLoadStr:" + payloadStr);
 
             byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr);
@@ -117,18 +115,22 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
             msg.setData(payloadDecodeByte);
             TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
 
-            String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
-            String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
-            String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
-            if(!ObjectUtils.isEmpty(clientIdStr)){
-                headers.put("clientId", Base64.getDecoder().decode(clientIdStr));
-            }
-            if(!ObjectUtils.isEmpty(qosStr)){
-                headers.put("qos", Base64.getDecoder().decode(qosStr));
-            }
-            if(!ObjectUtils.isEmpty(timestampStr)) {
-                headers.put("timestamp", Base64.getDecoder().decode(timestampStr));
-            }
+            String headerStr = ((JSONObject) map.get("headers")).getString("data");
+            HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class);
+            headers.setData(hashMap);
+
+//            String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
+//            String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
+//            String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
+//            if(!ObjectUtils.isEmpty(clientIdStr)){
+//                headers.put(MessageHeader.CLIENTID, clientIdStr);
+//            }
+//            if(!ObjectUtils.isEmpty(qosStr)){
+//                headers.put(MessageHeader.TOPIC,qosStr);
+//            }
+//            if(!ObjectUtils.isEmpty(timestampStr)) {
+//                headers.put(MessageHeader.TIMESTAMP, timestampStr);
+//            }
             msg.setHeaders(headers);
             // log.info("msg:" + msg.toString());
             return decoder.decode(msg);
diff --git a/device-manager-ui/admin/src/views/device/list.vue b/device-manager-ui/admin/src/views/device/list.vue
index 9129e554..282dbfe0 100644
--- a/device-manager-ui/admin/src/views/device/list.vue
+++ b/device-manager-ui/admin/src/views/device/list.vue
@@ -1,14 +1,23 @@
 <template>
   <div class="page">
     <LayoutTable :data="tableData" :config="tableConfig">
-        <el-tag slot="table-body-head" style="margin:5px" type="success">褰撳墠鍦ㄧ嚎璁惧鎬昏锛歿{tableData.onlineCount}}涓�</el-tag>
+      <el-tag slot="table-body-head" style="margin: 5px" type="success"
+        >褰撳墠鍦ㄧ嚎璁惧鎬昏锛歿{ tableData.onlineCount }}涓�</el-tag
+      >
 
-        <el-tag slot="table-body-head" style="margin:5px" type="danger">褰撳墠绂荤嚎璁惧鎬昏锛歿{tableData.offlineCount}}涓�</el-tag>
-        
-         <el-tag   v-for='($label, $value) in tableData.offlineDeviceType'
-            :key='$value'
-            :label="$value" slot="table-body-head" style="margin:5px" type="danger">{{$value}}绂荤嚎璁惧锛歿{$label}}涓�</el-tag>
-        
+      <el-tag slot="table-body-head" style="margin: 5px" type="danger"
+        >褰撳墠绂荤嚎璁惧鎬昏锛歿{ tableData.offlineCount }}涓�</el-tag
+      >
+
+      <el-tag
+        v-for="($label, $value) in tableData.offlineDeviceType"
+        :key="$value"
+        :label="$value"
+        slot="table-body-head"
+        style="margin: 5px"
+        type="danger"
+        >{{ $value }}绂荤嚎璁惧锛歿{ $label }}涓�</el-tag
+      >
     </LayoutTable>
 
     <dialog-show ref="dialogform" @ok="getData" />
@@ -23,9 +32,7 @@ export default {
   name: "Device",
   components: { dialogShow },
   mixins: [table],
-  created() {
-
-  },
+  created() {},
   methods: {
     /** 閲嶅啓鏂板鏂规硶 */
     toAdd(row) {
@@ -37,24 +44,51 @@ export default {
     },
     /** 閲嶅啓鏌ョ湅鏂规硶 */
     toView(row) {
-        this.$refs.dialogform.view(row);
+      this.$refs.dialogform.view(row);
     },
-    activeDevice(row) {
 
-     this.$post("/device/save", {
+    downMsg(row) {
+      this.$prompt("璇疯緭鍏ヤ笅鍙戞秷鎭唴瀹�", "鎻愮ず", {
+        confirmButtonText: "纭畾",
+        cancelButtonText: "鍙栨秷",
+      })
+        .then(({ value }) => {
+          this.$post("/device/downMsg", {
+            deviceId: row.id,
+            content: value,
+          })
+            .then((res) => {
+              if (res.code == 1) {
+                this.$message.success("涓嬪彂璁惧鎴愬姛锛�");
+                this.getData();
+              }
+            })
+            .catch((error) => {
+              this.$message.error(error.message);
+            });
+        })
+        .catch(() => {
+          this.$message({
+            type: "info",
+            message: "鍙栨秷杈撳叆",
+          });
+        });
+    },
+
+    activeDevice(row) {
+      this.$post("/device/save", {
         "entity.id": row.id,
         "entity.active": 1,
       })
         .then((res) => {
           if (res.code == 1) {
             this.$message.success("婵€娲昏澶囨垚鍔燂紒");
-            this.getData()
+            this.getData();
           }
         })
         .catch((error) => {
           this.$message.error(error.message);
         });
-
     },
   },
   data() {
@@ -62,27 +96,27 @@ export default {
       config: {
         getsocketData: null,
         search: [
-                 {
-            name: 'deviceNum',
-            type: 'text',
-            label: '璁惧缂栧彿',
+          {
+            name: "deviceNum",
+            type: "text",
+            label: "璁惧缂栧彿",
           },
           {
-            name: 'deviceOnlineStatus',
-            type: 'select',
-            label: '鍦ㄧ嚎鐘舵€�',
+            name: "deviceOnlineStatus",
+            type: "select",
+            label: "鍦ㄧ嚎鐘舵€�",
           },
 
-           {
-            name: 'deviceType',
-            type: 'select',
-            label: '璁惧绫诲瀷',
+          {
+            name: "deviceType",
+            type: "select",
+            label: "璁惧绫诲瀷",
           },
         ],
         columns: [
           { type: "selection", width: 60 },
 
-        //   { label: "璁惧鍚嶇О", prop: "deviceName" },
+          //   { label: "璁惧鍚嶇О", prop: "deviceName" },
 
           { label: "璁惧缂栫爜", prop: "deviceCode" },
 
@@ -100,20 +134,21 @@ export default {
             formatter: this.formatterDate,
           },
 
-             { label: "婵€娲荤姸鎬�", prop: "active", formatter: this.formatterYES },
+          { label: "婵€娲荤姸鎬�", prop: "active", formatter: this.formatterYES },
           {
             label: "鎿嶄綔",
-            width: 240,
+            width: 320,
             formatter: (row) => {
               return (
                 <div>
-                <table-buttons
-                  noAdd
-                  row={row}
-                  onEdit={this.toEdit}
-                  onView={this.toView}
-                  onDel={this.toDel}
-                />
+                  <table-buttons
+                    noAdd
+                    row={row}
+                    onEdit={this.toEdit}
+                    onView={this.toView}
+                    onDel={this.toDel}
+                  />
+                  <span> </span>
                   {row.active === 0 ? (
                     <el-button
                       size="mini"
@@ -128,6 +163,22 @@ export default {
                   ) : (
                     ""
                   )}
+                  <span> </span>
+
+                  {row.active === 1 ? (
+                    <el-button
+                      size="mini"
+                      type="text"
+                      icon="el-icon-open"
+                      onClick={() => {
+                        this.downMsg(row);
+                      }}
+                    >
+                      涓嬪彂淇℃伅
+                    </el-button>
+                  ) : (
+                    ""
+                  )}
                 </div>
               );
             },
diff --git a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
index 66ac8d86..ea610971 100644
--- a/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/busiz/web/DeviceApiController.java
@@ -10,6 +10,7 @@ import com.mortals.xhx.busiz.rsp.ApiResp;
 import com.mortals.xhx.busiz.rsp.DeviceResp;
 import com.mortals.xhx.busiz.security.DeviceTokenService;
 import com.mortals.xhx.common.code.*;
+import com.mortals.xhx.common.key.Constant;
 import com.mortals.xhx.module.device.model.DeviceEntity;
 import com.mortals.xhx.module.device.service.DeviceLogService;
 import com.mortals.xhx.module.device.service.DeviceService;
@@ -117,7 +118,7 @@ public class DeviceApiController {
             DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
             if (!ObjectUtils.isEmpty(deviceEntity)) {
                 //灏嗕笂鎶ヤ俊鎭浆鍙戝埌mq涓�
-                ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), JSON.toJSONString(req), null);
+                ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), Constant.UPLOAD_TOPIC, JSON.toJSONString(req), null);
             log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
             }
         } catch (Exception e) {
diff --git a/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java b/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
index e175f95a..ef9c4a53 100644
--- a/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
+++ b/device-manager/src/main/java/com/mortals/xhx/common/key/Constant.java
@@ -20,12 +20,12 @@ public final class Constant {
 	/**
 	 * 涓婅娑堟伅topic
 	 */
-	public static final String UPLOAD_TOPIC = "upload:";
+	public static final String UPLOAD_TOPIC = "/upload/";
 
 	/**
 	 * 涓嬭娑堟伅topic
 	 */
-	public static final String DOWN_TOPIC = "down:";
+	public static final String DOWN_TOPIC = "/down/";
 
 	/**
 	 * 楠岃瘉鐮佹湁鏁堟湡锛堝垎閽燂級
diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java b/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
index 52489d42..4cb30571 100644
--- a/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/daemon/DemoApiController.java
@@ -87,17 +87,17 @@ public class DemoApiController {
 
             TopicPartitionInfo info = TopicPartitionInfo.builder().topic("demoTopic").build();
             TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
-            header.put(MessageHeader.CLIENTID, "aaa".getBytes());
+            header.put(MessageHeader.CLIENTID, "aaa");
 
 
             Map<String, Object> headers = new HashMap<>();
             headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
-            String payLoad="12313";
+            String payLoad = "12313";
 
-            TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID(), payLoad.getBytes(), header);
+            TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
 
 
-            for(int i=0;i<10;i++){
+            for (int i = 0; i < 10; i++) {
                 producer.send(info, queueMsg, new TbQueueCallback() {
                     @Override
                     public void onSuccess(TbQueueMsgMetadata metadata) {
diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DemoStartedService.java b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceMsgComsumerStartedService.java
similarity index 98%
rename from device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DemoStartedService.java
rename to device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceMsgComsumerStartedService.java
index 30077c03..46f96892 100644
--- a/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DemoStartedService.java
+++ b/device-manager/src/main/java/com/mortals/xhx/daemon/applicationservice/DeviceMsgComsumerStartedService.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
 
 @Component
 @CommonsLog
-public class DemoStartedService implements IApplicationStartedService {
+public class DeviceMsgComsumerStartedService implements IApplicationStartedService {
 
     @Autowired
     private DefaultTbCoreConsumerService consumerService;
diff --git a/device-manager/src/main/java/com/mortals/xhx/daemon/netty/server/controlserver/handler/NettyUDPServerHandler.java b/device-manager/src/main/java/com/mortals/xhx/daemon/netty/server/controlserver/handler/NettyUDPServerHandler.java
index b74ca2d0..dd0d5f9c 100644
--- a/device-manager/src/main/java/com/mortals/xhx/daemon/netty/server/controlserver/handler/NettyUDPServerHandler.java
+++ b/device-manager/src/main/java/com/mortals/xhx/daemon/netty/server/controlserver/handler/NettyUDPServerHandler.java
@@ -31,9 +31,8 @@ public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramP
         String URL = GlobalSysInfo.getParamValue(PARAM_SERVER_HTTP_URL, "http://192.168.0.100:11021");
         String req = msg.content().toString(CharsetUtil.UTF_8);
         System.out.println(req);
-        if ("璋氳瀛楀吀鏌ヨ锛�".equals(req)) {
-            ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("鏈嶅姟鍣ㄥ湴鍧€锛�" + URL, CharsetUtil.UTF_8), msg.sender()));
-        }
+        ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("鏈嶅姟鍣ㄥ湴鍧€锛�" + URL, CharsetUtil.UTF_8), msg.sender()));
+
     }
 
     @Override
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java b/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
index 6a17e894..430e3e87 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/service/DeviceService.java
@@ -17,9 +17,9 @@ import java.util.List;
 public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
 
 
-    ApiResp<String> sendDeviceMessage(Long deviceId, String message , Context context);
+    ApiResp<String> sendDeviceMessage(Long deviceId, String topic,String message , Context context);
 
-    ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String message , Context context);
+    ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message , Context context);
 
 
 
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java b/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
index 06c006bf..de9a60ea 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/service/impl/DeviceServiceImpl.java
@@ -1,5 +1,6 @@
 package com.mortals.xhx.module.device.service.impl;
 
+import cn.hutool.core.util.IdUtil;
 import com.mortals.framework.model.Context;
 import com.mortals.framework.util.DateUtils;
 import com.mortals.xhx.busiz.rsp.ApiResp;
@@ -42,37 +43,36 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
 
 
     @Override
-    public ApiResp<String> sendDeviceMessage(Long deviceId, String message, Context context) {
+    public ApiResp<String> sendDeviceMessage(Long deviceId, String topic, String message, Context context) {
         ApiResp<String> resp = new ApiResp<>();
         resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
         resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
         //鍙戦€佹秷鎭�
-        send(message, context, resp, deviceId);
+        send(topic, message, context, resp, deviceId);
         return resp;
     }
 
     @Override
-    public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String message, Context context) {
+    public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message, Context context) {
         ApiResp<String> resp = new ApiResp<>();
         resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
         resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
 
         for (Long deviceId : deviceIds) {
-            send(message, context, resp, deviceId);
+            send(topic, message, context, resp, deviceId);
         }
         return resp;
     }
 
-    private void send(String message, Context context, ApiResp<String> resp, Long deviceId) {
+    private void send(String topic, String message, Context context, ApiResp<String> resp, Long deviceId) {
         DeviceEntity deviceEntity = this.get(deviceId, context);
         if (!ObjectUtils.isEmpty(deviceEntity)) {
             TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
-            TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
+            TopicPartitionInfo info = TopicPartitionInfo.builder().topic(topic + deviceEntity.getDeviceMac()).build();
             TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
-            header.put(MessageHeader.CLIENTID, "device".getBytes());
-            Map<String, Object> headers = new HashMap<>();
-            headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
-            TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID(), message == null ? "".getBytes() : message.getBytes(), header);
+            header.put(MessageHeader.CLIENTID, deviceEntity.getDeviceCode());
+            header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
+            TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
 
             producer.send(info, queueMsg, new TbQueueCallback() {
                 @Override
diff --git a/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java b/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
index c9212e38..b69096b2 100644
--- a/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
+++ b/device-manager/src/main/java/com/mortals/xhx/module/device/web/DeviceController.java
@@ -3,13 +3,16 @@ package com.mortals.xhx.module.device.web;
 import com.mortals.framework.exception.AppException;
 import com.mortals.framework.model.OrderCol;
 import com.mortals.xhx.base.system.param.service.ParamService;
+import com.mortals.xhx.busiz.rsp.ApiResp;
 import com.mortals.xhx.common.code.ActiveEnum;
+import com.mortals.xhx.common.code.ApiRespCodeEnum;
 import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
 import com.mortals.xhx.common.code.DeviceTypeEnum;
+import com.mortals.xhx.common.key.Constant;
 import com.mortals.xhx.module.firm.model.FirmQuery;
 import com.mortals.xhx.module.firm.service.FirmService;
-import org.apache.commons.lang3.ObjectUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.bind.annotation.*;
@@ -85,4 +88,30 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
         super.init(request, response, form, model, context);
     }
 
+
+    /**
+     * 涓嬪彂淇℃伅
+     */
+    @PostMapping(value = "downMsg")
+    public String downMsg(@RequestParam(value = "deviceId") Long deviceId,@RequestParam(value = "content") String content) {
+        JSONObject jsonObject = new JSONObject();
+        Map<String, Object> model = new HashMap<>();
+        String busiDesc = this.getModuleDesc()+"涓嬪彂璁惧娑堟伅" ;
+        try {
+            ApiResp<String> apiResp = this.service.sendDeviceMessage(deviceId, Constant.DOWN_TOPIC, content, getContext());
+            if(ApiRespCodeEnum.SUCCESS.getValue()!=apiResp.getCode()){
+                throw new AppException("涓嬪彂娑堟伅澶辫触锛�");
+            }
+            this.init(request, response, null, model, getContext());
+            recordSysLog(request, busiDesc + " 銆愭垚鍔熴€�");
+            jsonObject.put(KEY_RESULT_DATA, model);
+            jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_SUCCESS);
+        } catch (Exception e) {
+            log.error("涓嬪彂璁惧娑堟伅", e);
+            jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_FAILURE);
+            jsonObject.put(KEY_RESULT_MSG, super.convertException(e));
+        }
+        return jsonObject.toJSONString();
+    }
+
 }
\ No newline at end of file
-- 
2.24.3