Commit 01ee4a0c authored by 赵啸非's avatar 赵啸非

修改消息组件

parent 468ff768
package com.mortals.xhx.common.model;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import lombok.Setter;
import java.util.HashMap;
import java.util.Map;
......@@ -13,26 +14,27 @@ import java.util.Map;
*/
public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
protected final Map<String, byte[]> data = new HashMap<>();
@Setter
protected Map<String, String> data = new HashMap<>();
public DefaultTbQueueMsgHeaders() {
data.put(MessageHeader.TOPIC, new byte[]{});
data.put(MessageHeader.QOS, new byte[]{(byte) 0});
data.put(MessageHeader.TOPIC, "");
data.put(MessageHeader.QOS, "0");
}
@Override
public byte[] put(String key, byte[] value) {
return data.put(key, value);
public void put(String key, String value) {
data.put(key, value);
}
@Override
public byte[] get(String key) {
public String get(String key) {
return data.get(key);
}
@Override
public Map<String, byte[]> getData() {
public Map<String, String> getData() {
return data;
}
}
......@@ -19,8 +19,17 @@ import java.util.UUID;
@AllArgsConstructor
@NoArgsConstructor
public class DefaultTbQueueMsg implements TbQueueMsg {
private UUID key;
/**
* key 唯一标识
*/
private String key;
/**
* 数据载体
*/
private byte[] data;
/**
* 消息头信息
*/
private TbQueueMsgHeaders headers;
public DefaultTbQueueMsg(TbQueueMsg msg) {
......
......@@ -10,7 +10,7 @@ import java.util.UUID;
*/
public interface TbQueueMsg {
UUID getKey();
String getKey();
TbQueueMsgHeaders getHeaders();
......
......@@ -10,9 +10,11 @@ import java.util.Map;
*/
public interface TbQueueMsgHeaders {
byte[] put(String key, byte[] value);
void put(String key, String value);
byte[] get(String key);
String get(String key);
Map<String, byte[]> getData();
Map<String, String> getData();
void setData(Map<String, String> data);
}
......@@ -8,22 +8,22 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.UUID;
public class KafkaTbQueueMsg implements TbQueueMsg {
private final UUID key;
private final String key;
private final TbQueueMsgHeaders headers;
private final byte[] data;
public KafkaTbQueueMsg(ConsumerRecord<String, byte[]> record) {
this.key = UUID.fromString(record.key());
this.key = record.key();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
record.headers().forEach(header -> {
headers.put(header.key(), header.value());
headers.put(header.key(), new String(header.value()));
});
this.headers = headers;
this.data = record.value();
}
@Override
public UUID getKey() {
public String getKey() {
return key;
}
......
......@@ -64,7 +64,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
if (tpi.getTopic() == null) {
tpi.setTopic(this.defaultTopic);
}
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue().getBytes())).collect(Collectors.toList());
record = new ProducerRecord<>(tpi.getTopic(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
......
......@@ -33,7 +33,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public UUID getKey() {
public String getKey() {
return msg.getKey();
}
......
......@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.Channel;
......@@ -105,11 +106,8 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
// DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class);
msg.setKey(UUID.fromString((String)map.get("key")));
msg.setKey((String)map.get("key"));
String payloadStr = (String)map.get("data");
byte[] payloadByte = payloadStr.getBytes();
System.out.println("receivedPayLoadStr:" + payloadStr);
byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr);
......@@ -117,18 +115,22 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
msg.setData(payloadDecodeByte);
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
if(!ObjectUtils.isEmpty(clientIdStr)){
headers.put("clientId", Base64.getDecoder().decode(clientIdStr));
}
if(!ObjectUtils.isEmpty(qosStr)){
headers.put("qos", Base64.getDecoder().decode(qosStr));
}
if(!ObjectUtils.isEmpty(timestampStr)) {
headers.put("timestamp", Base64.getDecoder().decode(timestampStr));
}
String headerStr = ((JSONObject) map.get("headers")).getString("data");
HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class);
headers.setData(hashMap);
// String clientIdStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("clientId");
// String qosStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("qos");
// String timestampStr = ((JSONObject) ((JSONObject) map.get("headers")).get("data")).getString("timestamp");
// if(!ObjectUtils.isEmpty(clientIdStr)){
// headers.put(MessageHeader.CLIENTID, clientIdStr);
// }
// if(!ObjectUtils.isEmpty(qosStr)){
// headers.put(MessageHeader.TOPIC,qosStr);
// }
// if(!ObjectUtils.isEmpty(timestampStr)) {
// headers.put(MessageHeader.TIMESTAMP, timestampStr);
// }
msg.setHeaders(headers);
// log.info("msg:" + msg.toString());
return decoder.decode(msg);
......
<template>
<div class="page">
<LayoutTable :data="tableData" :config="tableConfig">
<el-tag slot="table-body-head" style="margin:5px" type="success">当前在线设备总计:{{tableData.onlineCount}}</el-tag>
<el-tag slot="table-body-head" style="margin: 5px" type="success"
>当前在线设备总计:{{ tableData.onlineCount }}</el-tag
>
<el-tag slot="table-body-head" style="margin:5px" type="danger">当前离线设备总计:{{tableData.offlineCount}}</el-tag>
<el-tag v-for='($label, $value) in tableData.offlineDeviceType'
:key='$value'
:label="$value" slot="table-body-head" style="margin:5px" type="danger">{{$value}}离线设备:{{$label}}</el-tag>
<el-tag slot="table-body-head" style="margin: 5px" type="danger"
>当前离线设备总计:{{ tableData.offlineCount }}</el-tag
>
<el-tag
v-for="($label, $value) in tableData.offlineDeviceType"
:key="$value"
:label="$value"
slot="table-body-head"
style="margin: 5px"
type="danger"
>{{ $value }}离线设备:{{ $label }}</el-tag
>
</LayoutTable>
<dialog-show ref="dialogform" @ok="getData" />
......@@ -23,9 +32,7 @@ export default {
name: "Device",
components: { dialogShow },
mixins: [table],
created() {
},
created() {},
methods: {
/** 重写新增方法 */
toAdd(row) {
......@@ -37,24 +44,51 @@ export default {
},
/** 重写查看方法 */
toView(row) {
this.$refs.dialogform.view(row);
this.$refs.dialogform.view(row);
},
activeDevice(row) {
this.$post("/device/save", {
downMsg(row) {
this.$prompt("请输入下发消息内容", "提示", {
confirmButtonText: "确定",
cancelButtonText: "取消",
})
.then(({ value }) => {
this.$post("/device/downMsg", {
deviceId: row.id,
content: value,
})
.then((res) => {
if (res.code == 1) {
this.$message.success("下发设备成功!");
this.getData();
}
})
.catch((error) => {
this.$message.error(error.message);
});
})
.catch(() => {
this.$message({
type: "info",
message: "取消输入",
});
});
},
activeDevice(row) {
this.$post("/device/save", {
"entity.id": row.id,
"entity.active": 1,
})
.then((res) => {
if (res.code == 1) {
this.$message.success("激活设备成功!");
this.getData()
this.getData();
}
})
.catch((error) => {
this.$message.error(error.message);
});
},
},
data() {
......@@ -62,27 +96,27 @@ export default {
config: {
getsocketData: null,
search: [
{
name: 'deviceNum',
type: 'text',
label: '设备编号',
{
name: "deviceNum",
type: "text",
label: "设备编号",
},
{
name: 'deviceOnlineStatus',
type: 'select',
label: '在线状态',
name: "deviceOnlineStatus",
type: "select",
label: "在线状态",
},
{
name: 'deviceType',
type: 'select',
label: '设备类型',
{
name: "deviceType",
type: "select",
label: "设备类型",
},
],
columns: [
{ type: "selection", width: 60 },
// { label: "设备名称", prop: "deviceName" },
// { label: "设备名称", prop: "deviceName" },
{ label: "设备编码", prop: "deviceCode" },
......@@ -100,20 +134,21 @@ export default {
formatter: this.formatterDate,
},
{ label: "激活状态", prop: "active", formatter: this.formatterYES },
{ label: "激活状态", prop: "active", formatter: this.formatterYES },
{
label: "操作",
width: 240,
width: 320,
formatter: (row) => {
return (
<div>
<table-buttons
noAdd
row={row}
onEdit={this.toEdit}
onView={this.toView}
onDel={this.toDel}
/>
<table-buttons
noAdd
row={row}
onEdit={this.toEdit}
onView={this.toView}
onDel={this.toDel}
/>
<span> </span>
{row.active === 0 ? (
<el-button
size="mini"
......@@ -128,6 +163,22 @@ export default {
) : (
""
)}
<span> </span>
{row.active === 1 ? (
<el-button
size="mini"
type="text"
icon="el-icon-open"
onClick={() => {
this.downMsg(row);
}}
>
下发信息
</el-button>
) : (
""
)}
</div>
);
},
......
......@@ -10,6 +10,7 @@ import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.busiz.rsp.DeviceResp;
import com.mortals.xhx.busiz.security.DeviceTokenService;
import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.module.device.model.DeviceEntity;
import com.mortals.xhx.module.device.service.DeviceLogService;
import com.mortals.xhx.module.device.service.DeviceService;
......@@ -117,7 +118,7 @@ public class DeviceApiController {
DeviceEntity deviceEntity = deviceService.getExtCache(req.getDeviceNum());
if (!ObjectUtils.isEmpty(deviceEntity)) {
//将上报信息转发到mq中
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), JSON.toJSONString(req), null);
ApiResp<String> sendDeviceMessageResp = deviceService.sendDeviceMessage(deviceEntity.getId(), Constant.UPLOAD_TOPIC, JSON.toJSONString(req), null);
log.info(String.format("sendMsgResp:%s", JSON.toJSONString(sendDeviceMessageResp)));
}
} catch (Exception e) {
......
......@@ -20,12 +20,12 @@ public final class Constant {
/**
* 上行消息topic
*/
public static final String UPLOAD_TOPIC = "upload:";
public static final String UPLOAD_TOPIC = "/upload/";
/**
* 下行消息topic
*/
public static final String DOWN_TOPIC = "down:";
public static final String DOWN_TOPIC = "/down/";
/**
* 验证码有效期(分钟)
......
......@@ -87,17 +87,17 @@ public class DemoApiController {
TopicPartitionInfo info = TopicPartitionInfo.builder().topic("demoTopic").build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.CLIENTID, "aaa".getBytes());
header.put(MessageHeader.CLIENTID, "aaa");
Map<String, Object> headers = new HashMap<>();
headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
String payLoad="12313";
String payLoad = "12313";
TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID(), payLoad.getBytes(), header);
TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID().toString(), payLoad.getBytes(), header);
for(int i=0;i<10;i++){
for (int i = 0; i < 10; i++) {
producer.send(info, queueMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
......
......@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
@Component
@CommonsLog
public class DemoStartedService implements IApplicationStartedService {
public class DeviceMsgComsumerStartedService implements IApplicationStartedService {
@Autowired
private DefaultTbCoreConsumerService consumerService;
......
......@@ -31,9 +31,8 @@ public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramP
String URL = GlobalSysInfo.getParamValue(PARAM_SERVER_HTTP_URL, "http://192.168.0.100:11021");
String req = msg.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if ("谚语字典查询?".equals(req)) {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("服务器地址:" + URL, CharsetUtil.UTF_8), msg.sender()));
}
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("服务器地址:" + URL, CharsetUtil.UTF_8), msg.sender()));
}
@Override
......
......@@ -17,9 +17,9 @@ import java.util.List;
public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
ApiResp<String> sendDeviceMessage(Long deviceId, String message , Context context);
ApiResp<String> sendDeviceMessage(Long deviceId, String topic,String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String message , Context context);
ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message , Context context);
......
package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.util.IdUtil;
import com.mortals.framework.model.Context;
import com.mortals.framework.util.DateUtils;
import com.mortals.xhx.busiz.rsp.ApiResp;
......@@ -42,37 +43,36 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
@Override
public ApiResp<String> sendDeviceMessage(Long deviceId, String message, Context context) {
public ApiResp<String> sendDeviceMessage(Long deviceId, String topic, String message, Context context) {
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
//发送消息
send(message, context, resp, deviceId);
send(topic, message, context, resp, deviceId);
return resp;
}
@Override
public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String message, Context context) {
public ApiResp<String> sendDeviceMessage(List<Long> deviceIds, String topic, String message, Context context) {
ApiResp<String> resp = new ApiResp<>();
resp.setCode(ApiRespCodeEnum.SUCCESS.getValue());
resp.setMsg(ApiRespCodeEnum.SUCCESS.getLabel());
for (Long deviceId : deviceIds) {
send(message, context, resp, deviceId);
send(topic, message, context, resp, deviceId);
}
return resp;
}
private void send(String message, Context context, ApiResp<String> resp, Long deviceId) {
private void send(String topic, String message, Context context, ApiResp<String> resp, Long deviceId) {
DeviceEntity deviceEntity = this.get(deviceId, context);
if (!ObjectUtils.isEmpty(deviceEntity)) {
TbQueueProducer<TbQueueMsg> producer = producerProvider.getTbCoreMsgProducer();
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceMac()).build();
TopicPartitionInfo info = TopicPartitionInfo.builder().topic(topic + deviceEntity.getDeviceMac()).build();
TbQueueMsgHeaders header = new DefaultTbQueueMsgHeaders();
header.put(MessageHeader.CLIENTID, "device".getBytes());
Map<String, Object> headers = new HashMap<>();
headers.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime().getBytes());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(UUID.randomUUID(), message == null ? "".getBytes() : message.getBytes(), header);
header.put(MessageHeader.CLIENTID, deviceEntity.getDeviceCode());
header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), message == null ? "".getBytes() : message.getBytes(), header);
producer.send(info, queueMsg, new TbQueueCallback() {
@Override
......
......@@ -3,13 +3,16 @@ package com.mortals.xhx.module.device.web;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.OrderCol;
import com.mortals.xhx.base.system.param.service.ParamService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.ActiveEnum;
import com.mortals.xhx.common.code.ApiRespCodeEnum;
import com.mortals.xhx.common.code.DeviceOnlineStatusEnum;
import com.mortals.xhx.common.code.DeviceTypeEnum;
import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.module.firm.model.FirmQuery;
import com.mortals.xhx.module.firm.service.FirmService;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
......@@ -85,4 +88,30 @@ public class DeviceController extends BaseCRUDJsonMappingController<DeviceServic
super.init(request, response, form, model, context);
}
/**
* 下发信息
*/
@PostMapping(value = "downMsg")
public String downMsg(@RequestParam(value = "deviceId") Long deviceId,@RequestParam(value = "content") String content) {
JSONObject jsonObject = new JSONObject();
Map<String, Object> model = new HashMap<>();
String busiDesc = this.getModuleDesc()+"下发设备消息" ;
try {
ApiResp<String> apiResp = this.service.sendDeviceMessage(deviceId, Constant.DOWN_TOPIC, content, getContext());
if(ApiRespCodeEnum.SUCCESS.getValue()!=apiResp.getCode()){
throw new AppException("下发消息失败!");
}
this.init(request, response, null, model, getContext());
recordSysLog(request, busiDesc + " 【成功】");
jsonObject.put(KEY_RESULT_DATA, model);
jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_SUCCESS);
} catch (Exception e) {
log.error("下发设备消息", e);
jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_FAILURE);
jsonObject.put(KEY_RESULT_MSG, super.convertException(e));
}
return jsonObject.toJSONString();
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment