Commit 6306cc8b authored by 赵啸非's avatar 赵啸非

修正redis 最大连接数

parent 3d944af7
...@@ -20,6 +20,11 @@ public class RedisKey { ...@@ -20,6 +20,11 @@ public class RedisKey {
*/ */
public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue"; public static final String KEY_DEVICE_ONLINE_QUEUE = "device:queue";
/**
* 设备通知外部更新队列
*/
public static final String KEY_DEVICE_THIRDPARTY_QUEUE = "device:thirdparty:queue";
public static final String KEY_SITE_CACHE = "siteDict"; public static final String KEY_SITE_CACHE = "siteDict";
public static final String KEY_PLATFORM_CACHE = "platformDict"; public static final String KEY_PLATFORM_CACHE = "platformDict";
......
package com.mortals.xhx.daemon.applicationservice;
import cn.hutool.core.net.url.UrlBuilder;
import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.service.ICacheService;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.pdu.DeviceReq;
import com.mortals.xhx.common.utils.SendTaskThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static com.mortals.xhx.common.key.RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE;
@Component
@Slf4j
public class DeviceSendThirdPartyService implements IApplicationStartedService {
@Autowired
private SendTaskThreadPool sendTaskThreadPool;
protected Boolean stopped = false;
@Autowired
private ICacheService cacheService;
@Autowired
private MessageService messageService;
@Value("${thirdPartyPath:/inter/device/deviceIn}")
public String thirdPartyPath;
@Override
public void start() {
log.info("初始化发送线程数量");
//启动短信发送响应更新线程
sendTaskThreadPool.execute(() -> {
int waitTime = 1000;
while (!stopped) {
try {
DeviceReq deviceReq = cacheService.lpop(KEY_DEVICE_THIRDPARTY_QUEUE, DeviceReq.class);
if (!ObjectUtils.isEmpty(deviceReq)) {
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090");
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
}
} catch (Exception e) {
if (!stopped) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e2) {
}
}
}
}
});
}
@Override
public void stop() {
log.info("停止服务..");
this.stopped = true;
}
@Override
public int getOrder() {
return 50;
}
}
...@@ -44,7 +44,6 @@ public class DeviceStartService implements IApplicationStartedService { ...@@ -44,7 +44,6 @@ public class DeviceStartService implements IApplicationStartedService {
@Override @Override
public void start() { public void start() {
log.info("初始化发送线程数量"); log.info("初始化发送线程数量");
sendTaskThreadPool.init(20);
//启动短信发送响应更新线程 //启动短信发送响应更新线程
sendTaskThreadPool.execute(() -> { sendTaskThreadPool.execute(() -> {
int waitTime = 1000; int waitTime = 1000;
......
...@@ -10,17 +10,13 @@ import com.mortals.xhx.common.utils.SyncTreeSiteThread; ...@@ -10,17 +10,13 @@ import com.mortals.xhx.common.utils.SyncTreeSiteThread;
import com.mortals.xhx.module.site.service.SiteService; import com.mortals.xhx.module.site.service.SiteService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/**
* 应用级服务,在应用启动后、停止过程中调用
* 应用已经完成启动完成,才调用该服务
* 应用场景:
* 1、应用任务,应用启动后定时或间隔执行的任务
* 2、Socket服务端
*/
@Component @Component
@Slf4j @Slf4j
@ConditionalOnExpression("'${platform.type:standalone}'=='standalone'")
public class SiteStartedService implements IApplicationStartedService { public class SiteStartedService implements IApplicationStartedService {
@Autowired @Autowired
...@@ -32,7 +28,6 @@ public class SiteStartedService implements IApplicationStartedService { ...@@ -32,7 +28,6 @@ public class SiteStartedService implements IApplicationStartedService {
@Override @Override
public void start() { public void start() {
log.info("开始服务..[初始化用户站点树]"); log.info("开始服务..[初始化用户站点树]");
ThreadPool.getInstance().init(20);
UserEntity userEntity = new UserEntity(); UserEntity userEntity = new UserEntity();
userEntity.initAttrValue(); userEntity.initAttrValue();
userEntity.setId(0L); userEntity.setId(0L);
......
package com.mortals.xhx.module.device.service.impl; package com.mortals.xhx.module.device.service.impl;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlPath; import cn.hutool.core.net.url.UrlPath;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.mortals.framework.ap.GlobalSysInfo;
import com.mortals.framework.common.Rest; import com.mortals.framework.common.Rest;
import com.mortals.framework.exception.AppException; import com.mortals.framework.exception.AppException;
import com.mortals.framework.model.Context; import com.mortals.framework.model.Context;
...@@ -17,6 +15,7 @@ import com.mortals.xhx.busiz.rsp.ApiResp; ...@@ -17,6 +15,7 @@ import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.common.code.*; import com.mortals.xhx.common.code.*;
import com.mortals.xhx.common.key.Constant; import com.mortals.xhx.common.key.Constant;
import com.mortals.xhx.common.key.QueueKey; import com.mortals.xhx.common.key.QueueKey;
import com.mortals.xhx.common.key.RedisKey;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders; import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.common.model.MessageHeader; import com.mortals.xhx.common.model.MessageHeader;
import com.mortals.xhx.common.pdu.DeviceReq; import com.mortals.xhx.common.pdu.DeviceReq;
...@@ -55,7 +54,6 @@ import java.util.*; ...@@ -55,7 +54,6 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.mortals.xhx.common.key.Constant.MESSAGETYPE_NOTIFY_RESTART_APP; import static com.mortals.xhx.common.key.Constant.MESSAGETYPE_NOTIFY_RESTART_APP;
import static com.mortals.xhx.common.key.Constant.PARAM_SERVER_PHP_IN_HTTP_URL;
import static java.math.BigDecimal.ROUND_HALF_DOWN; import static java.math.BigDecimal.ROUND_HALF_DOWN;
/** /**
...@@ -348,12 +346,17 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -348,12 +346,17 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
@Override @Override
public Rest<String> sendThirdParty(DeviceEntity entity, ProductEntity productEntity, PlatformEntity platformEntity, DeviceMethodEnum update) { public Rest<String> sendThirdParty(DeviceEntity entity, ProductEntity productEntity, PlatformEntity platformEntity, DeviceMethodEnum update) {
//todo 修改为异步发送消息,当前消息存放到redis的队列中
DeviceReq deviceReq = new DeviceReq(); DeviceReq deviceReq = new DeviceReq();
BeanUtils.copyProperties(entity, deviceReq, BeanUtil.getNullPropertyNames(entity)); BeanUtils.copyProperties(entity, deviceReq, BeanUtil.getNullPropertyNames(entity));
deviceReq.setDeviceStatus(update.getValue()); deviceReq.setDeviceStatus(update.getValue());
deviceReq.setProductCode(productEntity.getProductCode()); deviceReq.setProductCode(productEntity.getProductCode());
deviceReq.setDeviceInBuilding(entity.getDeviceInBuilding() == null ? 0 : entity.getDeviceInBuilding()); deviceReq.setDeviceInBuilding(entity.getDeviceInBuilding() == null ? 0 : entity.getDeviceInBuilding());
deviceReq.setDeviceInFloor(entity.getDeviceInFloor() == null ? 0 : entity.getDeviceInFloor()); deviceReq.setDeviceInFloor(entity.getDeviceInFloor() == null ? 0 : entity.getDeviceInFloor());
//http://192.168.0.98:8090/inter/device/deviceIn //http://192.168.0.98:8090/inter/device/deviceIn
//判断是否是php,如果不是 则是java 则内部调用 //判断是否是php,如果不是 则是java 则内部调用
if ("smartOffice".equals(platformEntity.getPlatformSn())) { if ("smartOffice".equals(platformEntity.getPlatformSn())) {
...@@ -362,15 +365,17 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D ...@@ -362,15 +365,17 @@ public class DeviceServiceImpl extends AbstractCRUDCacheServiceImpl<DeviceDao, D
Rest<String> officeRest = officeDeviceFeign.deviceCall(deviceReq); Rest<String> officeRest = officeDeviceFeign.deviceCall(deviceReq);
log.info("office resp ==>{}", JSON.toJSONString(officeRest)); log.info("office resp ==>{}", JSON.toJSONString(officeRest));
} else { } else {
String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090"); Long lpush = cacheService.lpush(RedisKey.KEY_DEVICE_THIRDPARTY_QUEUE, deviceReq);
ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq); log.info("lpush:{}",lpush);
log.info("sendThirty resp ==>{}", JSON.toJSONString(resp)); // String phpInUrl = GlobalSysInfo.getParamValue(PARAM_SERVER_PHP_IN_HTTP_URL, "http://172.15.28.116:8090");
// ApiResp<String> resp = messageService.sendThirdParty(UrlBuilder.of(phpInUrl).addPath(thirdPartyPath).build(), deviceReq);
if (!ObjectUtils.isEmpty(resp) && resp.getCode() == YesNoEnum.YES.getValue()) { // log.info("sendThirty resp ==>{}", JSON.toJSONString(resp));
return Rest.ok("成功!"); //
} else { // if (!ObjectUtils.isEmpty(resp) && resp.getCode() == YesNoEnum.YES.getValue()) {
return Rest.fail("发送失败"); // return Rest.ok("成功!");
} // } else {
// return Rest.fail("发送失败");
// }
} }
return Rest.ok(); return Rest.ok();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment