Commit b430a682 authored by 赵啸非's avatar 赵啸非

添加开启与关闭消费队列

parent 38ab11ec
...@@ -32,6 +32,12 @@ public interface TbQueueConsumer<T extends TbQueueMsg> { ...@@ -32,6 +32,12 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
*/ */
void unsubscribe(); void unsubscribe();
/**
* 取消订阅消息
* @param partitions
*/
void unsubscribe(Set<TopicPartitionInfo> partitions);
/** /**
* 拉取消息间隔 * 拉取消息间隔
* @param durationInMillis * @param durationInMillis
......
package com.mortals.xhx.queue.rabbitmq; package com.mortals.xhx.queue.rabbitmq;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.util.JsonUtil;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders; import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader; import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.queue.*; import com.mortals.xhx.queue.*;
...@@ -122,17 +125,26 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -122,17 +125,26 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
} }
} }
@Override
public void unsubscribe(Set<TopicPartitionInfo> partitions) {
Set<String> collect = partitions.stream().map(item -> item.getFullTopicName()).collect(Collectors.toSet());
queues=queues.stream().filter(f->collect.contains(f)).collect(Collectors.toSet());
}
@Override @Override
public String getChannelNumber() { public String getChannelNumber() {
return channel.getChannelNumber() + ""; return channel.getChannelNumber() + "";
} }
public T decode(GetResponse message) { public T decode(GetResponse message) {
try { try {
DefaultTbQueueMsg msg = new DefaultTbQueueMsg(); DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class); String messageBody = new String(message.getBody());
if(JSONUtil.isJson(messageBody)){
Map<String, Object> map = JSON.parseObject(messageBody, HashMap.class);
msg.setKey((String) map.get("key")); msg.setKey((String) map.get("key"));
String payloadStr = (String) map.get("data"); String payloadStr = (String) map.get("data");
msg.setData(payloadStr); msg.setData(payloadStr);
...@@ -141,8 +153,11 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -141,8 +153,11 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
headers.setData(hashMap); headers.setData(hashMap);
msg.setHeaders(headers); msg.setHeaders(headers);
return decoder.decode(msg); return decoder.decode(msg);
}else{
throw new AppException("消息内容异常");
}
} catch (Exception e) { } catch (Exception e) {
log.error("反序列化异常!", e); log.error("message:"+new String(message.getBody()),"反序列化异常!", e);
return null; return null;
} }
......
...@@ -42,6 +42,9 @@ public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{ ...@@ -42,6 +42,9 @@ public interface DeviceService extends ICRUDCacheService<DeviceEntity,Long>{
* @param context * @param context
*/ */
void deviceEnabled(Long id,Integer status,Context context); void deviceEnabled(Long id,Integer status,Context context);
void stopOrStartComsumeQueue(Long id,Integer status,Context context);
void sendThirdParty(DeviceEntity entity, ProductEntity productEntity, PlatformEntity platformEntity, DeviceMethodEnum update); void sendThirdParty(DeviceEntity entity, ProductEntity productEntity, PlatformEntity platformEntity, DeviceMethodEnum update);
......
...@@ -207,6 +207,27 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -207,6 +207,27 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
} }
} }
@Override
public void stopOrStartComsumeQueue(Long id, Integer status, Context context) {
DeviceEntity deviceEntity = this.get(id, context);
if (ObjectUtils.isEmpty(deviceEntity)) throw new AppException("当前设备不存在!");
PlatformEntity platformEntity = platformService.get(deviceEntity.getPlatformId());
ProductEntity productEntity = productService.get(deviceEntity.getProductId());
String exchangeName = platformEntity.getPlatformSn() + Constant.EXCHANGE_SPLIT + productEntity.getProductCode();
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(Constant.UPLOAD_TOPIC + deviceEntity.getDeviceCode(), null, exchangeName);
Set<TopicPartitionInfo> set = new HashSet<>();
set.add(topicPartitionInfo);
if(status==YesNoEnum.YES.getValue()){
//开启
consumerService.getMainConsumer().subscribe(set);
}else{
//关闭
consumerService.getMainConsumer().unsubscribe(set);
}
}
@Override @Override
protected void saveAfter(DeviceEntity entity, Context context) throws AppException { protected void saveAfter(DeviceEntity entity, Context context) throws AppException {
......
...@@ -198,6 +198,29 @@ public class DeviceController extends BaseCRUDJsonBodyMappingController<DeviceSe ...@@ -198,6 +198,29 @@ public class DeviceController extends BaseCRUDJsonBodyMappingController<DeviceSe
} }
/**
* 停止或启动消息队列中的消息
*/
@PostMapping(value = "stopOrStartComsumeQueue")
public String stopOrStartComsumeQueue(@RequestBody DeviceEntity deviceEntity) {
JSONObject jsonObject = new JSONObject();
Map<String, Object> model = new HashMap<>();
String busiDesc = this.getModuleDesc() + "设备启用停用";
try {
this.service.deviceEnabled(deviceEntity.getId(), deviceEntity.getEnabled(), getContext());
recordSysLog(request, busiDesc + " 【成功】");
jsonObject.put(KEY_RESULT_DATA, model);
jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_SUCCESS);
} catch (Exception e) {
log.error("设备启用停用消息", e);
jsonObject.put(KEY_RESULT_CODE, VALUE_RESULT_FAILURE);
jsonObject.put(KEY_RESULT_MSG, super.convertException(e));
}
return jsonObject.toJSONString();
}
@Override @Override
protected int doListAfter(DeviceEntity query, Map<String, Object> model, Context context) throws AppException { protected int doListAfter(DeviceEntity query, Map<String, Object> model, Context context) throws AppException {
//统计当前站点设备情况 //统计当前站点设备情况
......
...@@ -12,11 +12,12 @@ Content-Type: application/json ...@@ -12,11 +12,12 @@ Content-Type: application/json
###设备更新与保存 ###设备更新与保存
POST {{baseUrl}}/device/save POST {{baseUrl}}/device/save
Authorization: {{authToken}}
Content-Type: application/json Content-Type: application/json
{ {
"deviceName": "gosxaj", "deviceName": "gosxaj11",
"deviceCode": "ksdmo7", "deviceCode": "ksdmo711",
"deviceType": 1, "deviceType": 1,
"deviceMac": "a3ku15", "deviceMac": "a3ku15",
"ip": "5ffz2e", "ip": "5ffz2e",
......
...@@ -10,5 +10,9 @@ ...@@ -10,5 +10,9 @@
"test": { "test": {
"baseUrl": "http://192.168.0.98:11091/m", "baseUrl": "http://192.168.0.98:11091/m",
"baseLogin": "http://192.168.0.98:11078/base" "baseLogin": "http://192.168.0.98:11078/base"
},
"portal": {
"baseUrl": "http://192.168.0.98:11072/zwfw",
"baseLogin": "http://192.168.0.98:11078/base"
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment