Commit 2e726eae authored by 赵啸非's avatar 赵啸非

设备列表导出优化

parent 8a2da03f
...@@ -30,7 +30,7 @@ public class DefaultTbQueueMsg implements TbQueueMsg { ...@@ -30,7 +30,7 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
/** /**
* 数据载体 * 数据载体
*/ */
private byte[] data; private String data;
/** /**
...@@ -56,7 +56,7 @@ public class DefaultTbQueueMsg implements TbQueueMsg { ...@@ -56,7 +56,7 @@ public class DefaultTbQueueMsg implements TbQueueMsg {
// header.put(MessageHeader.CLIENTID, "abcd1234"); // header.put(MessageHeader.CLIENTID, "abcd1234");
// header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime()); // header.put(MessageHeader.TIMESTAMP, DateUtils.getCurrStrDateTime());
//header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd"); //header.put(MessageHeader.MESSAGESIGN,"ssdafasdfasdfasfd");
TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "abcd1234".getBytes() , header); TbQueueMsg queueMsg = new DefaultTbQueueMsg(IdUtil.fastUUID(), "eyJmbG93bnVtIjoiQzEwMTEifQ==" , header);
String ret = JSON.toJSONString(queueMsg); String ret = JSON.toJSONString(queueMsg);
System.out.println("pro:"+ret); System.out.println("pro:"+ret);
......
...@@ -14,5 +14,5 @@ public interface TbQueueMsg { ...@@ -14,5 +14,5 @@ public interface TbQueueMsg {
TbQueueMsgHeaders getHeaders(); TbQueueMsgHeaders getHeaders();
byte[] getData(); String getData();
} }
...@@ -79,8 +79,10 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -79,8 +79,10 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
subscribed = false; subscribed = false;
partitions = subscribeQueue.poll(); partitions = subscribeQueue.poll();
} }
//新增新的订阅信息
if (!subscribed) { if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
//新加订阅项
doSubscribe(topicNames); doSubscribe(topicNames);
subscribed = true; subscribed = true;
} }
...@@ -97,17 +99,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -97,17 +99,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
} }
List<T> decodeRecords(List<R> records) { List<T> decodeRecords(List<R> records) {
List<T> result = new ArrayList<>(records.size()); List<T> result =records.stream().map(record->decode(record)).collect(Collectors.toList());
records.forEach(record -> {
try {
if (record != null) {
result.add(decode(record));
}
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
throw new RuntimeException("Failed to decode record: ", e);
}
});
return result; return result;
} }
...@@ -158,7 +150,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i ...@@ -158,7 +150,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected List<R> doPoll(long durationInMillis); abstract protected List<R> doPoll(long durationInMillis);
abstract protected T decode(R record) throws IOException; abstract protected T decode(R record);
abstract protected void doSubscribe(List<String> topicNames); abstract protected void doSubscribe(List<String> topicNames);
......
...@@ -33,7 +33,7 @@ public class KafkaTbQueueMsg implements TbQueueMsg { ...@@ -33,7 +33,7 @@ public class KafkaTbQueueMsg implements TbQueueMsg {
} }
@Override @Override
public byte[] getData() { public String getData() {
return data; return data.toString();
} }
} }
...@@ -77,7 +77,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue ...@@ -77,7 +77,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
} }
@Override @Override
public T decode(ConsumerRecord<String, byte[]> record) throws IOException { public T decode(ConsumerRecord<String, byte[]> record) {
return decoder.decode(new KafkaTbQueueMsg(record)); return decoder.decode(new KafkaTbQueueMsg(record));
} }
......
...@@ -13,6 +13,6 @@ import java.io.IOException; ...@@ -13,6 +13,6 @@ import java.io.IOException;
*/ */
public interface TbKafkaDecoder<T extends TbQueueMsg> { public interface TbKafkaDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg) throws IOException; T decode(TbQueueMsg msg);
} }
...@@ -59,7 +59,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro ...@@ -59,7 +59,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
@Override @Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
String key = msg.getKey().toString(); String key = msg.getKey().toString();
byte[] data = msg.getData(); byte[] data = msg.getData().getBytes();
ProducerRecord<String, byte[]> record; ProducerRecord<String, byte[]> record;
if (tpi.getTopic() == null) { if (tpi.getTopic() == null) {
tpi.setTopic(this.defaultTopic); tpi.setTopic(this.defaultTopic);
......
...@@ -47,7 +47,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { ...@@ -47,7 +47,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
} }
@Override @Override
public byte[] getData() { public String getData() {
return msg.getData(); return msg.getData();
} }
}); });
...@@ -69,7 +69,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { ...@@ -69,7 +69,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
} }
@Override @Override
public byte[] getData() { public String getData() {
return msg.getData(); return msg.getData();
} }
}); });
......
...@@ -84,9 +84,15 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -84,9 +84,15 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
@Override @Override
protected void doSubscribe(List<String> topicNames) { protected void doSubscribe(List<String> topicNames) {
queues = partitions.stream()
.map(TopicPartitionInfo::getFullTopicName) //新增的topkcnames
.collect(Collectors.toSet()); topicNames.stream().forEach(topic->{
queues.add(topic);
});
// queues = partitions.stream()
// .map(TopicPartitionInfo::getFullTopicName)
// .collect(Collectors.toSet());
log.info("doSubscribe:{}", JSON.toJSONString(queues)); log.info("doSubscribe:{}", JSON.toJSONString(queues));
} }
...@@ -125,20 +131,12 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb ...@@ -125,20 +131,12 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTb
public T decode(GetResponse message) { public T decode(GetResponse message) {
try { try {
DefaultTbQueueMsg msg = new DefaultTbQueueMsg(); DefaultTbQueueMsg msg = new DefaultTbQueueMsg();
// DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
if (ObjectUtils.isEmpty(message.getBody())) {
log.info("message is empty");
return null;
}
Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class); Map<String, Object> map = JSON.parseObject(new String(message.getBody()), HashMap.class);
msg.setKey((String) map.get("key")); msg.setKey((String) map.get("key"));
String payloadStr = (String) map.get("data"); String payloadStr = (String) map.get("data");
//log.info("payloadStr:{}", payloadStr); msg.setData(payloadStr);
//byte[] payloadDecodeByte = Base64.getDecoder().decode(payloadStr);
msg.setData(payloadStr.getBytes());
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
String headerStr = ((JSONObject) map.get("headers")).getString("data"); String headerStr = ((JSONObject) map.get("headers")).getString("data");
HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class); HashMap<String, String> hashMap = JSON.parseObject(headerStr, HashMap.class);
headers.setData(hashMap); headers.setData(hashMap);
......
...@@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.common.Rest; import com.mortals.framework.common.Rest;
import com.mortals.framework.exception.AppException; import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.Context; import com.mortals.framework.model.Context;
import com.mortals.framework.model.OrderCol;
import com.mortals.framework.util.FileUtil; import com.mortals.framework.util.FileUtil;
import com.mortals.framework.utils.ReflectUtils; import com.mortals.framework.utils.ReflectUtils;
import com.mortals.framework.utils.poi.ExcelUtil; import com.mortals.framework.utils.poi.ExcelUtil;
...@@ -40,10 +41,7 @@ import org.springframework.web.bind.annotation.*; ...@@ -40,10 +41,7 @@ import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
...@@ -91,6 +89,13 @@ public class DeviceController extends BaseCRUDJsonBodyMappingController<DeviceSe ...@@ -91,6 +89,13 @@ public class DeviceController extends BaseCRUDJsonBodyMappingController<DeviceSe
super.init(model, context); super.init(model, context);
} }
@Override
protected void doListBefore(DeviceEntity query, Map<String, Object> model, Context context) throws AppException {
List<OrderCol> orderColList = new ArrayList<>();
orderColList.add(new OrderCol("createTime",OrderCol.DESCENDING));
query.setOrderColList(orderColList);
super.doListBefore(query, model, context);
}
/** /**
* 下发信息 * 下发信息
......
...@@ -136,7 +136,7 @@ Content-Type: application/json ...@@ -136,7 +136,7 @@ Content-Type: application/json
Authorization: {{authToken}} Authorization: {{authToken}}
{ {
"deviceCode": "a102", "deviceCode": "B01",
"action": "upload" "action": "upload"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment