Commit 06c92b7f authored by 赵啸非's avatar 赵啸非

分组更新设备

parent b663fa56
...@@ -13,7 +13,10 @@ import org.springframework.stereotype.Service; ...@@ -13,7 +13,10 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
...@@ -31,14 +34,15 @@ public class DeviceUpdateComsumerThread extends AbstractThread { ...@@ -31,14 +34,15 @@ public class DeviceUpdateComsumerThread extends AbstractThread {
@Autowired @Autowired
private DeviceService deviceService; private DeviceService deviceService;
//
@Override @Override
protected int getSleepTime() { protected int getSleepTime() {
return 1000; return 3600;
} }
@Override @Override
protected void process() { protected void process() {
// log.info("DeviceUpdateComsumerThread process"); log.info("DeviceUpdateComsumerThread process");
List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>(); List<DeviceEntity> waitUpdateDeviceList = new ArrayList<>();
while (true) { while (true) {
DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_UPDATE_QUEUE, DeviceEntity.class); DeviceEntity deviceEntity = cacheService.lpop(RedisKey.KEY_DEVICE_UPDATE_QUEUE, DeviceEntity.class);
...@@ -48,13 +52,36 @@ public class DeviceUpdateComsumerThread extends AbstractThread { ...@@ -48,13 +52,36 @@ public class DeviceUpdateComsumerThread extends AbstractThread {
} }
waitUpdateDeviceList.add(deviceEntity); waitUpdateDeviceList.add(deviceEntity);
} }
//分组更新
if (!ObjectUtils.isEmpty(waitUpdateDeviceList)) { if (!ObjectUtils.isEmpty(waitUpdateDeviceList)) {
log.info("updateDeviceList size:{}", waitUpdateDeviceList.size()); log.info("updateDeviceList size:{}", waitUpdateDeviceList.size());
Map<Long, List<DeviceEntity>> collect = waitUpdateDeviceList.parallelStream().collect(Collectors.groupingBy(x -> x.getId()));
collect.entrySet().stream().forEach(item->{
List<DeviceEntity> sortUpdateList = item.getValue().stream().sorted(new Comparator<DeviceEntity>() {
@Override
public int compare(DeviceEntity o1, DeviceEntity o2) {
if (o1.getUpdateTime() == null) return 0;
if (o2.getUpdateTime() == null) return 0;
long start = o1.getUpdateTime().getTime();
long end = o2.getUpdateTime().getTime();
if (end > start) {
return 1;
} else if (end < start) {
return -1;
} else {
return 0;
}
}
}).collect(Collectors.toList());
deviceService.update(sortUpdateList);
});
/*
for (DeviceEntity deviceEntity : waitUpdateDeviceList) { for (DeviceEntity deviceEntity : waitUpdateDeviceList) {
deviceService.update(deviceEntity); deviceService.update(deviceEntity);
} }
*/
} }
} }
......
...@@ -2,11 +2,13 @@ package com.mortals.xhx.thread; ...@@ -2,11 +2,13 @@ package com.mortals.xhx.thread;
import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.net.url.UrlBuilder;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo; import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService; import com.mortals.framework.service.ICacheService;
import com.mortals.framework.util.AbstractThread; import com.mortals.framework.util.AbstractThread;
import com.mortals.xhx.base.system.message.MessageService; import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp; import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.DeviceMethodEnum;
import com.mortals.xhx.common.pdu.DeviceReq; import com.mortals.xhx.common.pdu.DeviceReq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -47,9 +49,14 @@ public class SendThirdPartyThread extends AbstractThread { ...@@ -47,9 +49,14 @@ public class SendThirdPartyThread extends AbstractThread {
// log.info("SendThirdPartyThread process"); // log.info("SendThirdPartyThread process");
DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE, 10, DeviceReq.class); DeviceReq deviceReq = cacheService.blpop(KEY_DEVICE_THIRDPARTY_QUEUE, 10, DeviceReq.class);
if (!ObjectUtils.isEmpty(deviceReq)) { if (!ObjectUtils.isEmpty(deviceReq)) {
//设备上下线信息 不更新
if (DeviceMethodEnum.ONLINE.getValue() == deviceReq.getReceiveMethod() || DeviceMethodEnum.ONLINE.getValue() == deviceReq.getReceiveMethod()) {
return;
}
//deviceReq.getReceiveMethod()
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://127.0.0.1:11078/zwfw_api"); String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://127.0.0.1:11078/zwfw_api");
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq); ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
// log.info("sendThirty resp ==>{}", JSON.toJSONString(resp)); log.info("sendThirty req==>{} \n resp ==>{}", JSON.toJSONString(deviceReq), JSON.toJSONString(resp));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment