Commit 97780abe authored by 赵啸非's avatar 赵啸非

修改pom文件

parent 479346c2
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<properties>
<rabbitmq.version>4.8.0</rabbitmq.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.mortals.framework</groupId> <groupId>com.mortals.framework</groupId>
...@@ -33,6 +37,23 @@ ...@@ -33,6 +37,23 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId> <artifactId>commons-pool2</artifactId>
...@@ -62,6 +83,7 @@ ...@@ -62,6 +83,7 @@
<artifactId>javase</artifactId> <artifactId>javase</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
......
package com.mortals.xhx.common;
/**
* @author: zxfei
* @date: 2021/11/22 11:17
* @description:
**/
public class BrokerConfig {
}
package com.mortals.xhx.common.model;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import java.util.HashMap;
import java.util.Map;
/**
* 默认消息头
*
* @author: zxfei
* @date: 2021/11/22 11:14
*/
public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
protected final Map<String, byte[]> data = new HashMap<>();
public DefaultTbQueueMsgHeaders() {
data.put(MessageHeader.TOPIC, new byte[]{});
data.put(MessageHeader.QOS, new byte[]{(byte) 0});
}
@Override
public byte[] put(String key, byte[] value) {
return data.put(key, value);
}
@Override
public byte[] get(String key) {
return data.get(key);
}
@Override
public Map<String, byte[]> getData() {
return data;
}
}
package com.mortals.xhx.common.model;
/**
* 消息头
*
* @author: zxfei
* @date: 2021/11/22 11:15
*/
public class MessageHeader {
/**
* 客户id
*/
public static final String CLIENTID = "clientId";
public static final String MESSAGEPROTOCOL = "protocol";
public static final String TIMESTAMP = "timestamp";
public static final String MESSAGETYPE = "messageType";
/**
* topic
*/
public static final String TOPIC = "topic";
public static final String QOS = "qos";
public static final String RETAIN = "retain";
public static final String WILL = "will";
public static final String DUP = "dup";
public static final String productKey = "dup";
// public static final String productKey = "dup";
}
package com.mortals.xhx.queue;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.extern.apachecommons.CommonsLog;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 服务端消费消息服务
*
* @author: zxfei
* @date: 2021/11/22 11:31
*/
@CommonsLog
public class ConsumerService {
private long pollDuration;
protected volatile ExecutorService consumersExecutor;
@Getter
private TbQueueConsumer<TbQueueMsg> mainConsumer;
private String currentIp;
protected volatile boolean stopped = false;
public void init(TbQueueConsumer<TbQueueMsg> mainConsumer) {
this.consumersExecutor = Executors.newCachedThreadPool();
this.mainConsumer = mainConsumer;
launchMainConsumers();
this.mainConsumer.subscribe();
}
public ConsumerService(long pollDuration, String currentIp) {
this.pollDuration = pollDuration;
this.currentIp = currentIp;
}
/**
* 消费服务主线程
*/
protected void launchMainConsumers() {
consumersExecutor.submit(() -> {
while (!stopped) {
try {
//todo
List<TbQueueMsg> poll = mainConsumer.poll(pollDuration);
List<TbQueueMsg> msgs = poll;
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item : msgs) {
//todo
// Message innerMsg = new Message();
// TbQueueMsgHeaders headMap = item.getHeaders();
// int messageProtocl = headMap.get(MessageHeader.MESSAGEPROTOCOL)[0];
// Map<String, Object> headers = new HashMap<>();
// innerMsg.setPayload(item.getData());
// headers.put(MessageHeader.MESSAGEPROTOCOL, messageProtocl);
// innerMsg.setStoreTime(DateUtils.getCurrDate().getTime());
// if(messageProtocl== MsgTypeEnum.MSG_SYSTEM.getValue()){
// innerMsg.setHeaders(headers);
// sendSysMessage2Cluster(innerMsg);
// }else{
// String clientId = MixAll.convertByteS2Str(headMap.get(MessageHeader.CLIENTID));
// int messageType = headMap.get(MessageHeader.MESSAGETYPE)[0];
// String topic = MixAll.convertByteS2Str(headMap.get(MessageHeader.TOPIC));
// int qos = headMap.get(MessageHeader.QOS)[0];
//
// innerMsg.setClientId(clientId);
// innerMsg.setType(Message.Type.valueOf(messageType));
// headers.put(MessageHeader.TOPIC, topic);
// headers.put(MessageHeader.QOS, qos);
// headers.put(MessageHeader.RETAIN, false);
// headers.put(MessageHeader.DUP, false);
// innerMsg.setHeaders(headers);
// sendContrlMessage2Cluster(innerMsg);
// }
}
} catch (Exception e) {
log.error("Exception", e);
}
}
log.info("Queue Consumer stopped.");
});
}
public void destroy() {
if (!stopped) {
stopMainConsumers();
}
stopped = true;
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
}
protected void stopMainConsumers() {
if (mainConsumer != null) {
mainConsumer.unsubscribe();
}
}
}
package com.mortals.xhx.queue;
import com.mortals.xhx.queue.processing.AbstractConsumerService;
import com.mortals.xhx.queue.provider.TbCoreQueueFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
@Service
@Slf4j
public class DefaultTbCoreConsumerService extends AbstractConsumerService<TbQueueMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}")
private long pollDuration;
@Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout;
private TbQueueConsumer<TbQueueMsg> mainConsumer;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory) {
this.mainConsumer = tbCoreQueueFactory.createMsgConsumer();
}
@PostConstruct
public void init() {
log.info("初始化消费服务线程");
super.init("core-consumer");
// super.init("tb-core-consumer", "tb-core-notifications-consumer");
}
@PreDestroy
public void destroy() {
super.destroy();
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 2)
public void onApplicationEvent(ApplicationReadyEvent event) {
super.onApplicationEvent(event);
}
@Override
protected void launchMainConsumers() {
log.info("启动消费线程!");
consumersExecutor.submit(() -> {
while (!stopped) {
try {
List<TbQueueMsg> msgs = mainConsumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbQueueMsg item:msgs){
log.info("msg:"+item.toString());
}
mainConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
log.info(" Core Consumer stopped.");
});
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
}
private static class PendingMsgHolder {
// @Getter
// @Setter
// private volatile ToCoreMsg toCoreMsg;
}
@Override
protected void stopMainConsumers() {
if (mainConsumer != null) {
mainConsumer.unsubscribe();
}
}
}
package com.mortals.xhx.queue;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.UUID;
/**
* 默认消息
*
* @author: zxfei
* @date: 2021/11/22 10:59
*/
@Data
@AllArgsConstructor
public class DefaultTbQueueMsg implements TbQueueMsg {
private final UUID key;
private final byte[] data;
private final TbQueueMsgHeaders headers;
public DefaultTbQueueMsg(TbQueueMsg msg) {
this.key = msg.getKey();
this.data = msg.getData();
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
msg.getHeaders().getData().forEach(headers::put);
this.headers = headers;
}
}
package com.mortals.xhx.queue;
import com.mortals.xhx.common.BrokerConfig;
/**
* 消息队列工厂类,初始化MQ类型(kafka,rabbitMQ,memory)
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface MessageQueueFactory {
/**
* 创建消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> createMsgProducer(BrokerConfig brokerConfig);
/**
* 创建消息消费者
* @return
*/
TbQueueConsumer<TbQueueMsg> createMsgConsumer(BrokerConfig brokerConfig);
}
package com.mortals.xhx.queue;
import org.springframework.context.ApplicationListener;
public interface TbCoreConsumerService extends ApplicationListener {
}
package com.mortals.xhx.queue;
/**
* 队列回调消息
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface TbQueueCallback {
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}
package com.mortals.xhx.queue;
import java.util.List;
import java.util.Set;
/**
* 队列消息消费者接口
*
* @author: zxfei
* @date: 2021/11/22 10:57
*/
public interface TbQueueConsumer<T extends TbQueueMsg> {
/**
* 获取当topic
* @return
*/
String getTopic();
/**
* 订阅
*/
void subscribe();
/**
* 订阅(分区)
* @param partitions
*/
void subscribe(Set<TopicPartitionInfo> partitions);
/**
* 取消订阅
*/
void unsubscribe();
/**
* 拉取消息间隔
* @param durationInMillis
* @return
*/
List<T> poll(long durationInMillis);
/**
* 提交
*/
void commit();
}
package com.mortals.xhx.queue;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class TbQueueCoreSettings {
@Value("${queue.core.topic}")
private String topic;
@Value("${queue.core.partitions}")
private int partitions;
}
package com.mortals.xhx.queue;
import java.util.UUID;
/**
* 队列消息体
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsg {
UUID getKey();
TbQueueMsgHeaders getHeaders();
byte[] getData();
}
package com.mortals.xhx.queue;
public interface TbQueueMsgDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg);
}
package com.mortals.xhx.queue;
import java.util.Map;
/**
* 消息头信息
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsgHeaders {
byte[] put(String key, byte[] value);
byte[] get(String key);
Map<String, byte[]> getData();
}
package com.mortals.xhx.queue;
/**
* 队列消息元数据
*
* @author: zxfei
* @date: 2021/11/22 10:56
*/
public interface TbQueueMsgMetadata {
}
package com.mortals.xhx.queue;
/**
* 队列消息生产者
*
* @author: zxfei
* @date: 2021/11/22 10:55
*/
public interface TbQueueProducer<T extends TbQueueMsg> {
void init();
String getDefaultTopic();
//发送消息
void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback);
void stop();
}
package com.mortals.xhx.queue;
import lombok.Builder;
import lombok.Data;
import java.util.Objects;
import java.util.Optional;
@Data
public class TopicPartitionInfo {
private String topic;
private Integer partition;
private String fullTopicName;
@Builder
public TopicPartitionInfo(String topic, Integer partition) {
this.topic = topic;
this.partition = partition;
String tmp = topic;
if (partition != null) {
tmp += "." + partition;
}
this.fullTopicName = tmp;
}
public TopicPartitionInfo newByTopic(String topic) {
return new TopicPartitionInfo(topic, this.partition);
}
public String getTopic() {
return topic;
}
public Optional<Integer> getPartition() {
return Optional.ofNullable(partition);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionInfo that = (TopicPartitionInfo) o;
return topic.equals(that.topic) &&
Objects.equals(partition, that.partition) &&
fullTopicName.equals(that.fullTopicName);
}
@Override
public int hashCode() {
return Objects.hash(fullTopicName);
}
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
/**
* 抽象队列消费者模板
*
* @author: zxfei
* @date: 2021/11/22 11:12
*/
@Slf4j
public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
private volatile boolean subscribed;
protected volatile boolean stopped = false;
protected volatile Set<TopicPartitionInfo> partitions;
protected final ReentrantLock consumerLock = new ReentrantLock();
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
@Getter
private final String topic;
public AbstractTbQueueConsumerTemplate(String topic) {
this.topic = topic;
}
@Override
public void subscribe() {
log.info("enqueue topic subscribe {} ", topic);
if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic);
return;
}
subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null)));
}
@Override
public void subscribe(Set<TopicPartitionInfo> partitions) {
log.info("enqueue topics subscribe {} ", partitions);
if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic);
return;
}
subscribeQueue.add(partitions);
}
@Override
public List<T> poll(long durationInMillis) {
List<R> records;
long startNanos = System.nanoTime();
if (stopped) {
return errorAndReturnEmpty();
}
if (!subscribed && partitions == null && subscribeQueue.isEmpty()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
if (consumerLock.isLocked()) {
log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace"));
}
consumerLock.lock();
try {
//更新订阅的主题
while (!subscribeQueue.isEmpty()) {
subscribed = false;
partitions = subscribeQueue.poll();
}
if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
doSubscribe(topicNames);
subscribed = true;
}
records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis);
} finally {
consumerLock.unlock();
}
if (records.isEmpty()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
return decodeRecords(records);
}
List<T> decodeRecords(List<R> records) {
List<T> result = new ArrayList<>(records.size());
records.forEach(record -> {
try {
if (record != null) {
result.add(decode(record));
}
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
throw new RuntimeException("Failed to decode record: ", e);
}
});
return result;
}
List<T> errorAndReturnEmpty() {
log.error("poll invoked but consumer stopped for topic:" + topic, new RuntimeException("stacktrace"));
return emptyList();
}
List<T> sleepAndReturnEmpty(final long startNanos, final long durationInMillis) {
long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis);
long spentNanos = System.nanoTime() - startNanos;
if (spentNanos < durationNanos) {
try {
Thread.sleep(Math.max(TimeUnit.NANOSECONDS.toMillis(durationNanos - spentNanos), 1));
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to wait", e);
}
}
}
return emptyList();
}
@Override
public void commit() {
if (consumerLock.isLocked()) {
log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace"));
}
consumerLock.lock();
try {
doCommit();
} finally {
consumerLock.unlock();
}
}
@Override
public void unsubscribe() {
log.info("unsubscribe topic and stop consumer {}", getTopic());
stopped = true;
consumerLock.lock();
try {
doUnsubscribe();
} finally {
consumerLock.unlock();
}
}
abstract protected List<R> doPoll(long durationInMillis);
abstract protected T decode(R record) throws IOException;
abstract protected void doSubscribe(List<String> topicNames);
abstract protected void doCommit();
abstract protected void doUnsubscribe();
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.common.model.DefaultTbQueueMsgHeaders;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgHeaders;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.UUID;
public class KafkaTbQueueMsg implements TbQueueMsg {
private final UUID key;
private final TbQueueMsgHeaders headers;
private final byte[] data;
public KafkaTbQueueMsg(ConsumerRecord<String, byte[]> record) {
this.key = UUID.fromString(record.key());
TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
record.headers().forEach(header -> {
headers.put(header.key(), header.value());
});
this.headers = headers;
this.data = record.value();
}
@Override
public UUID getKey() {
return key;
}
@Override
public TbQueueMsgHeaders getHeaders() {
return headers;
}
@Override
public byte[] getData() {
return data;
}
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsgMetadata;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 队列元数据
*
* @author: zxfei
* @date: 2021/11/22 14:40
*/
@Data
@AllArgsConstructor
public class KafkaTbQueueMsgMetadata implements TbQueueMsgMetadata {
private RecordMetadata metadata;
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsg;
import lombok.Builder;
import lombok.extern.apachecommons.CommonsLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* kafka consumer 消费者模板
*
* @author: zxfei
* @date: 2021/11/22 11:21
*/
@Slf4j
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
private final KafkaConsumer<String, byte[]> consumer;
private final String groupId;
private final TbKafkaDecoder<T> decoder;
@Builder
private TbKafkaConsumerTemplate(TbKafkaSettings settings, String clientId, TbKafkaDecoder<T> decoder, String groupId, String topic) {
//默认topic
super(topic);
Properties props = settings.toConsumerProps();
//多个输入源的时候 需要配置
//props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
this.groupId = groupId;
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
}
@Override
protected void doSubscribe(List<String> topicNames) {
if (!topicNames.isEmpty()) {
// topicNames.forEach(admin::createTopicIfNotExists);
log.info("subscribe topics {}", topicNames);
consumer.subscribe(topicNames);
} else {
log.info("unsubscribe due to empty topic list");
consumer.unsubscribe();
}
}
@Override
protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
if (records.isEmpty()) {
return Collections.emptyList();
} else {
List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
records.forEach(recordList::add);
return recordList;
}
}
@Override
public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
return decoder.decode(new KafkaTbQueueMsg(record));
}
@Override
protected void doCommit() {
//同步提交,线程会阻塞,直到当前批次offset提交成功
consumer.commitAsync();
}
@Override
protected void doUnsubscribe() {
log.info("unsubscribe topic and close consumer for topic {}", getTopic());
if (consumer != null) {
//consumer.unsubscribe();
consumer.close();
}
}
public static void main(String[] args) {
// TbKafkaConsumerTemplate.builder().
}
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueMsg;
import java.io.IOException;
/**
* 队列消息编码
*
* @author: zxfei
* @date: 2021/11/22 11:22
*/
public interface TbKafkaDecoder<T extends TbQueueMsg> {
T decode(TbQueueMsg msg) throws IOException;
}
package com.mortals.xhx.queue.kafka;
import com.mortals.xhx.queue.TbQueueCallback;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.TopicPartitionInfo;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* kafka 生产者模板
*
* @author: zxfei
* @date: 2021/11/22 11:23
*/
@Data
@Slf4j
public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
/**
* 生产者
*/
private KafkaProducer<String, byte[]> producer;
/**
* kafka 配置信息
*/
private TbKafkaSettings settings;
private String defaultTopic;
/**
* topic组
*/
private Set<TopicPartitionInfo> topics;
@Builder
private TbKafkaProducerTemplate(TbKafkaSettings settings,String defaultTopic) {
this.settings = settings;
//初始化生产者参数
this.producer = new KafkaProducer<>(settings.toProducerProps());
this.defaultTopic = defaultTopic;
topics = ConcurrentHashMap.newKeySet();
}
@Override
public void init() {
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
String key = msg.getKey().toString();
byte[] data = msg.getData();
ProducerRecord<String, byte[]> record;
if (tpi.getTopic() == null) {
tpi.setTopic(this.defaultTopic);
}
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
record = new ProducerRecord<>(tpi.getTopic(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
}
} else {
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
}
});
}
@Override
public void stop() {
if (producer != null) {
producer.close();
}
}
}
package com.mortals.xhx.queue.kafka;
import lombok.Data;
/**
* 其它配置类
*
* @author: zxfei
* @date: 2021/11/22 13:31
*/
@Data
public class TbKafkaProperty {
private String key;
private String value;
}
package com.mortals.xhx.queue.kafka;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Properties;
/**
* kafka 配置类
*
* @author: zxfei
* @date: 2021/11/22 13:30
*/
@Slf4j
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@ConfigurationProperties(prefix = "queue.kafka")
@Component
public class TbKafkaSettings {
@Value("${queue.kafka.bootstrap.servers}")
private String servers;
@Value("${queue.kafka.acks}")
private String acks;
@Value("${queue.kafka.retries}")
private int retries;
@Value("${queue.kafka.batch.size}")
private int batchSize;
@Value("${queue.kafka.linger.ms}")
private long lingerMs;
@Value("${queue.kafka.buffer.memory}")
private long bufferMemory;
@Value("${queue.kafka.replication_factor}")
@Getter
private short replicationFactor;
@Value("${queue.kafka.max_poll_records:8192}")
private int maxPollRecords;
@Value("${queue.kafka.max_poll_interval_ms:300000}")
private int maxPollIntervalMs;
@Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
private int maxPartitionFetchBytes;
@Value("${queue.kafka.fetch_max_bytes:134217728}")
private int fetchMaxBytes;
@Setter
private List<TbKafkaProperty> other;
/**
* 管理端参数配置
* @return
*/
public Properties toAdminProps() {
Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
return props;
}
/**
* 消费者参数
*
* @return
*/
public Properties toConsumerProps() {
Properties props = toProps();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return props;
}
/**
* 生产者参数
*
* @return
*/
public Properties toProducerProps() {
Properties props = toProps();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return props;
}
private Properties toProps() {
Properties props = new Properties();
//添加其它参数
if (other != null) {
other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
}
return props;
}
}
package com.mortals.xhx.queue.processing;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.utils.IotThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public abstract class AbstractConsumerService<N extends TbQueueMsg> {
protected volatile ExecutorService consumersExecutor;
protected volatile boolean stopped = false;
public void init(String mainConsumerThreadName) {
this.consumersExecutor = Executors.newCachedThreadPool(IotThreadFactory.forName(mainConsumerThreadName));
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 2)
public void onApplicationEvent(ApplicationReadyEvent event) {
launchMainConsumers();
}
/**
* 启动消费主线程服务
*/
protected abstract void launchMainConsumers();
/**
* 停止消费主线程服务
*/
protected abstract void stopMainConsumers();
@PreDestroy
public void destroy() {
stopped = true;
stopMainConsumers();
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import com.mortals.xhx.queue.kafka.TbKafkaConsumerTemplate;
import com.mortals.xhx.queue.kafka.TbKafkaProducerTemplate;
import com.mortals.xhx.queue.kafka.TbKafkaSettings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
/**
* kafka 消息工厂类
*
* @author: zxfei
* @date: 2021/11/22 15:00
*/
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
@Autowired
private TbKafkaSettings kafkaSettings;
/**
* 初始化创建消息生产者
*
* @return
*/
@Override
public TbQueueProducer<TbQueueMsg> createMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbQueueMsg> builder = TbKafkaProducerTemplate.builder();
builder.settings(kafkaSettings);
return builder.build();
}
/**
* 初始化创建消息消费者
*
* @return
*/
@Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbQueueMsg> comsumerBuilder = TbKafkaConsumerTemplate.builder();
comsumerBuilder.settings(kafkaSettings);
return comsumerBuilder.build();
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.*;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqConsumerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqProducerTemplate;
import com.mortals.xhx.queue.rabbitmq.TbRabbitMqSettings;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueCoreSettings coreSettings;
public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, TbQueueCoreSettings coreSettings) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
}
@Override
public TbQueueProducer<TbQueueMsg> createMsgProducer() {
return new TbRabbitMqProducerTemplate<>(rabbitMqSettings, coreSettings.getTopic());
}
@Override
public TbQueueConsumer<TbQueueMsg> createMsgConsumer() {
return new TbRabbitMqConsumerTemplate<>(rabbitMqSettings, coreSettings.getTopic(), msg -> new TbQueueMsg() {
@Override
public UUID getKey() {
return msg.getKey();
}
@Override
public TbQueueMsgHeaders getHeaders() {
return msg.getHeaders();
}
@Override
public byte[] getData() {
return msg.getData();
}
});
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueConsumer;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
public interface TbCoreQueueFactory {
/**
* 消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> createMsgProducer();
/**
* 消息消费服务
* @return
*/
TbQueueConsumer<TbQueueMsg> createMsgConsumer();
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* 初始化消息生产者服务
*/
@CommonsLog
@Service
public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
/**
* 消息队列提供
*/
@Autowired
private TbCoreQueueFactory tbQueueProvider;
/**
* 消息队列生产者
*/
private TbQueueProducer<TbQueueMsg> queueProducer;
// public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {
// this.tbQueueProvider = tbQueueProvider;
// }
//
@PostConstruct
public void init() {
log.info("消息队列生产服务开始...");
this.queueProducer = tbQueueProvider.createMsgProducer();
}
@Override
public TbQueueProducer<TbQueueMsg> getTbCoreMsgProducer() {
return queueProducer;
}
}
package com.mortals.xhx.queue.provider;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueProducer;
/**
* 消息队列提供接口
*
* @author: zxfei
* @date: 2021/11/22 14:59
*/
public interface TbQueueProducerProvider {
/**
* 消息生产者
* @return
*/
TbQueueProducer<TbQueueMsg> getTbCoreMsgProducer();
}
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.mortals.xhx.queue.DefaultTbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsg;
import com.mortals.xhx.queue.TbQueueMsgDecoder;
import com.mortals.xhx.queue.TopicPartitionInfo;
import com.mortals.xhx.queue.kafka.AbstractTbQueueConsumerTemplate;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Slf4j
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
private final TbQueueMsgDecoder<T> decoder;
private Channel channel;
private Connection connection;
private volatile Set<String> queues;
public TbRabbitMqConsumerTemplate(TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
super(topic);
this.decoder = decoder;
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
// throw new RuntimeException("Failed to create connection." , e);
}
stopped = false;
}
@Override
protected List<GetResponse> doPoll(long durationInMillis) {
List<GetResponse> result = queues.stream()
.map(queue -> {
try {
return channel.basicGet(queue, false);
} catch (IOException e) {
log.error("Failed to get messages from queue: [{}]" , queue);
return null;
// throw new RuntimeException("Failed to get messages from queue." , e);
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (result.size() > 0) {
return result;
} else {
return Collections.emptyList();
}
}
@Override
protected void doSubscribe(List<String> topicNames) {
queues = partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
.collect(Collectors.toSet());
//queues.forEach(admin::createTopicIfNotExists);
}
@Override
protected void doCommit() {
try {
channel.basicAck(0, true);
} catch (IOException e) {
log.error("Failed to ack messages." , e);
}
}
@Override
protected void doUnsubscribe() {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
public T decode(GetResponse message) {
DefaultTbQueueMsg msg = JSON.parseObject(new String(message.getBody()), DefaultTbQueueMsg.class);
return decoder.decode(msg);
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.mortals.xhx.queue.*;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Slf4j
public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
private String defaultTopic;
private TbRabbitMqSettings rabbitMqSettings;
private ListeningExecutorService producerExecutor;
private Channel channel;
private Connection connection;
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
public TbRabbitMqProducerTemplate(TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
this.defaultTopic = defaultTopic;
this.rabbitMqSettings = rabbitMqSettings;
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
try {
connection = rabbitMqSettings.getConnectionFactory().newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
log.error("Failed to create connection." , e);
// throw new RuntimeException("Failed to create connection." , e);
}
}
@Override
public void init() {
}
@Override
public String getDefaultTopic() {
return defaultTopic;
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi);
AMQP.BasicProperties properties = new AMQP.BasicProperties();
try {
channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, JSON.toJSONString(new DefaultTbQueueMsg(msg)).getBytes());
if (callback != null) {
callback.onSuccess(null);
}
} catch (IOException e) {
log.error("Failed publish message: [{}]." , msg, e);
if (callback != null) {
callback.onFailure(e);
}
}
}
@Override
public void stop() {
if (producerExecutor != null) {
producerExecutor.shutdownNow();
}
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
log.error("Failed to close the channel.");
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.error("Failed to close the connection.");
}
}
}
private void createTopicIfNotExist(TopicPartitionInfo tpi) {
if (topics.contains(tpi)) {
return;
}
// admin.createTopicIfNotExists(tpi.getFullTopicName());
topics.add(tpi);
}
}
package com.mortals.xhx.queue.rabbitmq;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
//@Component
//@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
public class TbRabbitMqQueueArguments {
@Value("${queue.rabbitmq.queue-properties.core}")
private String coreProperties;
@Value("${queue.rabbitmq.queue-properties.rule-engine}")
private String ruleEngineProperties;
@Value("${queue.rabbitmq.queue-properties.transport-api}")
private String transportApiProperties;
@Value("${queue.rabbitmq.queue-properties.notifications}")
private String notificationsProperties;
@Value("${queue.rabbitmq.queue-properties.js-executor}")
private String jsExecutorProperties;
@Getter
private Map<String, Object> coreArgs;
@Getter
private Map<String, Object> ruleEngineArgs;
@Getter
private Map<String, Object> transportApiArgs;
@Getter
private Map<String, Object> notificationsArgs;
@Getter
private Map<String, Object> jsExecutorArgs;
@PostConstruct
private void init() {
coreArgs = getArgs(coreProperties);
ruleEngineArgs = getArgs(ruleEngineProperties);
transportApiArgs = getArgs(transportApiProperties);
notificationsArgs = getArgs(notificationsProperties);
jsExecutorArgs = getArgs(jsExecutorProperties);
}
private Map<String, Object> getArgs(String properties) {
Map<String, Object> configs = new HashMap<>();
for (String property : properties.split(";")) {
int delimiterPosition = property.indexOf(":");
String key = property.substring(0, delimiterPosition);
String strValue = property.substring(delimiterPosition + 1);
configs.put(key, getObjectValue(strValue));
}
return configs;
}
private Object getObjectValue(String str) {
if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) {
return Boolean.valueOf(str);
} else if (isNumeric(str)) {
return getNumericValue(str);
}
return str;
}
private Object getNumericValue(String str) {
if (str.contains(".")) {
return Double.valueOf(str);
} else {
return Long.valueOf(str);
}
}
private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?");
public boolean isNumeric(String strNum) {
if (strNum == null) {
return false;
}
return PATTERN.matcher(strNum).matches();
}
}
package com.mortals.xhx.queue.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
@Component
@Data
public class TbRabbitMqSettings {
@Value("${queue.rabbitmq.exchange_name:}")
private String exchangeName;
@Value("${queue.rabbitmq.host:}")
private String host;
@Value("${queue.rabbitmq.port:}")
private int port;
@Value("${queue.rabbitmq.virtual_host:}")
private String virtualHost;
@Value("${queue.rabbitmq.username:}")
private String username;
@Value("${queue.rabbitmq.password:}")
private String password;
@Value("${queue.rabbitmq.automatic_recovery_enabled:}")
private boolean automaticRecoveryEnabled;
@Value("${queue.rabbitmq.connection_timeout:}")
private int connectionTimeout;
@Value("${queue.rabbitmq.handshake_timeout:}")
private int handshakeTimeout;
private ConnectionFactory connectionFactory;
@PostConstruct
private void init() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
connectionFactory.setConnectionTimeout(connectionTimeout);
connectionFactory.setHandshakeTimeout(handshakeTimeout);
}
}
package com.mortals.xhx.utils;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import java.util.HashSet;
import java.util.Set;
public class BeanUtil
{
/**
*
* @Title: getNullPropertyNames
* @Description: 获取一个对象中属性值为null的属性名字符串数组
* @param source
* @return
*/
public static String[] getNullPropertyNames (Object source) {
final BeanWrapper src = new BeanWrapperImpl(source);
java.beans.PropertyDescriptor[] pds = src.getPropertyDescriptors();
Set<String> emptyNames = new HashSet<String>();
for(java.beans.PropertyDescriptor pd : pds) {
Object srcValue = src.getPropertyValue(pd.getName());
if (srcValue == null) emptyNames.add(pd.getName());
}
String[] result = new String[emptyNames.size()];
return emptyNames.toArray(result);
}
}
package com.mortals.xhx.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class IotThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public static IotThreadFactory forName(String name) {
return new IotThreadFactory(name);
}
private IotThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name + "-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
package com.mortals.xhx.utils;//package com.mortals.coops.utils;
//
//import javax.validation.Constraint;
//import javax.validation.ConstraintValidator;
//import javax.validation.ConstraintValidatorContext;
//import javax.validation.Payload;
//import java.lang.annotation.*;
//import java.time.LocalDate;
//
//@Target({ElementType.FIELD})
//@Retention(RetentionPolicy.RUNTIME)
//@Constraint(validatedBy = PastLocalDate.PastValidator.class)
//@Documented
//public @interface PastLocalDate {
// String message() default "{javax.validation.constraints.Past.message}";
//
// Class<?>[] groups() default {};
//
// Class<? extends Payload>[] payload() default {};
//
// class PastValidator implements ConstraintValidator<PastLocalDate,
// LocalDate> {
// public void initialize(PastLocalDate past) {
// }
//
// public boolean isValid(LocalDate localDate,
// ConstraintValidatorContext context) {
// return localDate == null || localDate.isBefore(LocalDate.now());
// }
// }
//}
package com.mortals.xhx.utils;
import com.mortals.framework.util.DateUtils;
import java.text.SimpleDateFormat;
import java.util.*;
public class TimeUtil {
public static List<String> findDaysStr(String begintTime, String endTime) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date dBegin = null;
Date dEnd = null;
try {
dBegin = sdf.parse(begintTime);
dEnd = sdf.parse(endTime);
} catch (Exception e) {
e.printStackTrace();
}
//存放每一天日期String对象的daysStrList
List<String> daysStrList = new ArrayList<String>();
//放入开始的那一天日期String
daysStrList.add(sdf.format(dBegin));
Calendar calBegin = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calBegin.setTime(dBegin);
Calendar calEnd = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calEnd.setTime(dEnd);
// 判断循环此日期是否在指定日期之后
while (dEnd.after(calBegin.getTime())) {
// 根据日历的规则,给定的日历字段增加或减去指定的时间量
calBegin.add(Calendar.DAY_OF_MONTH, 1);
String dayStr = sdf.format(calBegin.getTime());
daysStrList.add(dayStr);
}
return daysStrList;
}
public static List<String> findMonthsStr(String begintTime, String endTime) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM");
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date dBegin = null;
Date dEnd = null;
try {
dBegin = sdf.parse(begintTime);
dEnd = sdf.parse(endTime);
} catch (Exception e) {
e.printStackTrace();
}
//存放每一天日期String对象的daysStrList
List<String> monthsStrList = new ArrayList<String>();
//放入开始的那一天的月份String
monthsStrList.add(sdf1.format(dBegin));
Calendar calBegin = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calBegin.setTime(dBegin);
Calendar calEnd = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calEnd.setTime(dEnd);
// 判断循环此日期是否在指定日期之后
while (dEnd.after(calBegin.getTime())) {
// 根据日历的规则,给定的日历字段增加或减去指定的时间量
calBegin.add(Calendar.MONTH, 1);
String dayStr = sdf1.format(calBegin.getTime());
monthsStrList.add(dayStr);
}
return monthsStrList;
}
public static List<String> findYearsStr(String begintTime, String endTime) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy");
Date dBegin = null;
Date dEnd = null;
try {
dBegin = sdf.parse(begintTime);
dEnd = sdf.parse(endTime);
} catch (Exception e) {
e.printStackTrace();
}
List<String> yearsStrList = new ArrayList<String>();
yearsStrList.add(sdf.format(dBegin));
Calendar calBegin = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calBegin.setTime(dBegin);
Calendar calEnd = Calendar.getInstance();
// 使用给定的 Date 设置此 Calendar 的时间
calEnd.setTime(dEnd);
// 判断循环此日期是否在指定日期之后
while (dEnd.after(calBegin.getTime())) {
// 根据日历的规则,给定的日历字段增加或减去指定的时间量
calBegin.add(Calendar.YEAR, 1);
String dayStr = sdf.format(calBegin.getTime());
yearsStrList.add(dayStr);
}
return yearsStrList;
}
public static void main(String[] args) {
/* String begintTime = "2017";
String endTime = "2018";
// TimeUtil.findDaysStr(begintTime,endTime).stream().forEach(f->System.out.println(f));
//TimeUtil.findMonthsStr(begintTime,endTime).stream().forEach(f->System.out.println(f));
TimeUtil.findYearsStr(begintTime,endTime).stream().forEach(f->System.out.println(f));*/
/* String begintTime = "2017-03-01";
;
String yyyy = DateUtils.getDateTimeStr(DateUtils.StrToDateTime(begintTime, "yyyy"), DateUtils.P_yyyy_MM_dd);
System.out.println(yyyy);*/
String startTime="2019-03-18";
System.out.println(DateUtils.StrToDateTime(startTime,DateUtils.P_yyyy_MM_dd).getTime());
}
}
package com.mortals.xhx.utils.stream.messaging;//package com.mortals.xhx.utils.stream.messaging;
//
///**
// * @author karlhoo
// */
//public interface ProcessTaskProcessor extends ProcessTaskSink, ProcessTaskSource {
//}
package com.mortals.xhx.utils.stream.service.impl;//package com.mortals.xhx.utils.stream.service.impl;
//
//import com.mortals.xhx.utils.stream.service.IMessageService;
//import lombok.extern.apachecommons.CommonsLog;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.kafka.support.KafkaHeaders;
//import org.springframework.messaging.Message;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.support.MessageBuilder;
//import org.springframework.stereotype.Component;
//
///**
// * @author karlhoo
// */
//@CommonsLog
//@Component
//public class DefaultMessageServiceImpl implements IMessageService {
//
// @Override
// public boolean sendMessage(MessageChannel messageChannel, String message) {
// return sendMessage(messageChannel, message, null);
// }
//
// @Override
// public boolean sendMessage(MessageChannel messageChannel, String message, String messageKey) {
// return sendMessage(messageChannel, MessageBuilder.withPayload(message).setHeader(KafkaHeaders.MESSAGE_KEY, messageKey == null ? messageKey : messageKey.getBytes()).build());
// }
//
// private boolean sendMessage(MessageChannel messageChannel, Message message) {
// try {
// return messageChannel.send(message);
// } catch (Exception e) {
// log.error(String.format("提交消息出错 messageChannel: %s, message: %s", messageChannel.toString(), message.getPayload()), e);
// return false;
// }
// }
//
//}
...@@ -2,5 +2,8 @@ ...@@ -2,5 +2,8 @@
NODE_ENV = development NODE_ENV = development
# 地址 # 地址
VUE_APP_BASE_API =127.0.0.1:18211/m VUE_APP_BASE_API =http://plm.testnew.com:8082/m
# websocket地址
VUE_APP_WEBSOCKET_API =127.0.0.1:18211/m
...@@ -2,5 +2,8 @@ ...@@ -2,5 +2,8 @@
NODE_ENV = production NODE_ENV = production
# 地址 # 地址
VUE_APP_BASE_API = 192.168.0.26:18221 VUE_APP_BASE_API = http://192.168.0.100:11021/m
# websocket地址
VUE_APP_WEBSOCKET_API =192.168.0.100:18211/m
# 测试环境配置
NODE_ENV = test
# 地址
VUE_APP_BASE_API = 192.168.0.26:18221/m
...@@ -4,8 +4,9 @@ ...@@ -4,8 +4,9 @@
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "vue-cli-service serve", "dev": "vue-cli-service serve",
"build": "vue-cli-service build --model test", "build": "vue-cli-service build",
"build:prod": "vue-cli-service build --model production" "stage": "vue-cli-service build --mode stage",
"build:prod": "vue-cli-service build --model prod"
}, },
"dependencies": { "dependencies": {
"@chenfengyuan/vue-qrcode": "^1.0.2", "@chenfengyuan/vue-qrcode": "^1.0.2",
......
...@@ -7,7 +7,8 @@ let reUrl='' ...@@ -7,7 +7,8 @@ let reUrl=''
* @param {string} url ws地址 * @param {string} url ws地址
*/ */
export const createSocket = url => { export const createSocket = url => {
Socket && Socket.close() // Socket && Socket.close()
Socket=null
if (!Socket) { if (!Socket) {
console.log('建立websocket连接:'+url) console.log('建立websocket连接:'+url)
reUrl=url reUrl=url
......
...@@ -60,7 +60,9 @@ ...@@ -60,7 +60,9 @@
</template> </template>
<script> <script>
import { createSocket } from "@/assets/utils/websocket";
export default { export default {
name: "Header",
methods: { methods: {
handleCommand(key) { handleCommand(key) {
if(key === 'update'){ if(key === 'update'){
...@@ -79,6 +81,46 @@ export default { ...@@ -79,6 +81,46 @@ export default {
}) })
} }
}, },
beforeDestroy() {
console.log("beforeDestroy");
window.removeEventListener("message", this.getsocketData, false);
},
mounted() {
console.log("mounted");
this.$nextTick(function () {
console.log("login websocket:"+"ws://"+process.env.VUE_APP_WEBSOCKET_API +"/ws?accessToken="+ this.$store.state.userData.id)
createSocket(
"ws://" +
process.env.VUE_APP_WEBSOCKET_API +
"/ws?accessToken=" +
this.$store.state.userData.id
);
});
let _this = this;
const getsocketData = (e) => {
// 创建接收消息函数
const data = e && e.detail.data;
let obj = JSON.parse(data);
if (obj.type == "SEND_TO_ALL_REQUEST") {
vm.refreshData();
let content = JSON.parse(obj.body.content);
_this.$notify({
title: "警告",
message: content,
type: "warning",
duration: 8000,
});
}
};
this.getsocketData = getsocketData;
// 注册监听事件
window.addEventListener("onmessageWS", getsocketData,false);
},
computed: { computed: {
group() { group() {
const relativeGroup = this.$store.state.group; const relativeGroup = this.$store.state.group;
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
<el-dialog :title="title" :visible.sync="open" width="80%" append-to-body> <el-dialog :title="title" :visible.sync="open" width="80%" append-to-body>
<el-form ref="form" :model="form" :rules="rules" label-width="120px"> <el-form ref="form" :model="form" :rules="rules" label-width="120px">
<el-row> <el-row>
<Field :span="20" label="设备名称" prop="deviceName" v-model="form.deviceName" placeholder="请输入设备名称"/> <!-- <Field :span="20" label="设备名称" prop="deviceName" v-model="form.deviceName" placeholder="请输入设备名称"/> -->
<Field :span="20" label="设备编码" prop="deviceCode" v-model="form.deviceCode" type="textarea" placeholder="请输入设备编码"/> <Field :span="20" label="设备编码" prop="deviceCode" v-model="form.deviceCode" type="textarea" placeholder="请输入设备编码"/>
<Field :span="20" label="设备类型" prop="deviceType" v-model="form.deviceType" type="select" :enumData="dict.deviceType" placeholder="请选择设备类型"/> <Field :span="20" label="设备类型" prop="deviceType" v-model="form.deviceType" type="select" :enumData="dict.deviceType" placeholder="请选择设备类型"/>
<Field :span="20" label="设备的MAC地址" prop="deviceMac" v-model="form.deviceMac" placeholder="请输入设备的MAC地址"/> <Field :span="20" label="设备的MAC地址" prop="deviceMac" v-model="form.deviceMac" placeholder="请输入设备的MAC地址"/>
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data="tableData" :config="tableConfig"> <LayoutTable :data="tableData" :config="tableConfig">
<el-tag slot="table-body-head" style="margin:5px" type="success">当前在线设备:{{tableData.onlineCount}}</el-tag> <el-tag slot="table-body-head" style="margin:5px" type="success">当前在线设备总计{{tableData.onlineCount}}</el-tag>
<el-tag slot="table-body-head" style="margin:5px" type="danger">当前离线设备:{{tableData.offlineCount}}</el-tag> <el-tag slot="table-body-head" style="margin:5px" type="danger">当前离线设备总计{{tableData.offlineCount}}</el-tag>
<el-tag v-for='($label, $value) in tableData.offlineDeviceType'
:key='$value'
:label="$value" slot="table-body-head" style="margin:5px" type="danger">{{$value}}离线设备:{{$label}}</el-tag>
</LayoutTable> </LayoutTable>
...@@ -21,38 +26,38 @@ export default { ...@@ -21,38 +26,38 @@ export default {
components: { dialogShow }, components: { dialogShow },
mixins: [table], mixins: [table],
created() { created() {
let _this = this; // let _this = this;
const getsocketData = (e) => { // const getsocketData = (e) => {
// 创建接收消息函数 // // 创建接收消息函数
const data = e && e.detail.data; // const data = e && e.detail.data;
let obj = JSON.parse(data); // let obj = JSON.parse(data);
if (obj.type == "SEND_TO_ALL_REQUEST") { // if (obj.type == "SEND_TO_ALL_REQUEST") {
let msg = ""; // let msg = "";
let content = JSON.parse(obj.body.content); // let content = JSON.parse(obj.body.content);
if (content.deviceOnlineStatus == 1) { // if (content.deviceOnlineStatus == 1) {
console.log(_this.tableData.dict) // console.log(_this.tableData.dict)
msg = _this.tableData.dict.deviceType[content.deviceType]+ "设备:" + content.deviceCode + " 上线!"; // msg = _this.tableData.dict.deviceType[content.deviceType]+ "设备:" + content.deviceCode + " 上线!";
} else { // } else {
msg = _this.tableData.dict.deviceType[content.deviceType]+"设备:" + content.deviceCode + " 离线!"; // msg = _this.tableData.dict.deviceType[content.deviceType]+"设备:" + content.deviceCode + " 离线!";
} // }
_this.$notify({ // _this.$notify({
title: "警告", // title: "警告",
message: msg, // message: msg,
type: "warning", // type: "warning",
duration: 8000, // duration: 8000,
}); // });
_this.getData(); // _this.getData();
} // }
console.log(data);
};
this.getsocketData = getsocketData; // console.log(data);
// 注册监听事件 // };
window.addEventListener("onmessageWS", getsocketData,false);
// this.getsocketData = getsocketData;
// // 注册监听事件
// window.addEventListener("onmessageWS", getsocketData,false);
}, },
methods: { methods: {
/** 重写新增方法 */ /** 重写新增方法 */
...@@ -64,9 +69,9 @@ export default { ...@@ -64,9 +69,9 @@ export default {
this.$refs.dialogform.edit(row); this.$refs.dialogform.edit(row);
}, },
/** 重写查看方法 */ /** 重写查看方法 */
// toView(row) { toView(row) {
// this.$refs.dialogform.view(row); this.$refs.dialogform.view(row);
// }, },
}, },
data() { data() {
return { return {
...@@ -83,6 +88,12 @@ export default { ...@@ -83,6 +88,12 @@ export default {
type: 'select', type: 'select',
label: '在线状态', label: '在线状态',
}, },
{
name: 'deviceType',
type: 'select',
label: '设备类型',
},
], ],
columns: [ columns: [
{ type: "selection", width: 60 }, { type: "selection", width: 60 },
...@@ -96,7 +107,7 @@ export default { ...@@ -96,7 +107,7 @@ export default {
{ {
label: "在线状态 ", label: "在线状态 ",
prop: "deviceOnlineStatus", prop: "deviceOnlineStatus",
formatter: this.formatter, formatter: this.formatterYES,
}, },
{ {
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' notPagination notDel> <LayoutTable :data='tableData' :config='tableConfig' notPagination notDel >
<div> <div>
<el-tree <el-tree
:data="tree" :data="tree"
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' /> <LayoutTable :data="tableData" notAdd notDel :config="tableConfig" />
</div> </div>
</template> </template>
<script> <script>
import table from '@/assets/mixins/table'; import table from "@/assets/mixins/table";
export default { export default {
mixins: [table], mixins: [table],
data() { data() {
return { return {
config: { config: {
search: [ search: [
{ {
name: 'loginName', name: "loginName",
type: 'text', type: "text",
label: '登录名', label: "登录名",
}, },
{ {
name: 'requestUrl', name: "requestUrl",
type: 'text', type: "text",
label: '请求地址', label: "请求地址",
}, },
], ],
columns: [ columns: [
{
type: 'selection', {
width: 60, prop: "id",
}, label: "序号",
align: "center",
{ },
prop: 'id',
label: '序号', {
}, prop: "userName",
label: "用户名称",
align: "center",
{ },
prop: 'userName',
label: '用户名称', {
}, prop: "loginName",
label: "用户登录名",
align: "center",
{ },
prop: 'loginName',
label: '用户登录名', {
}, prop: "requestUrl",
label: "请求地址",
align: "center",
{ },
prop: 'requestUrl',
label: '请求地址', {
}, prop: "content",
label: "操作内容",
align: "center",
{ },
prop: 'content',
label: '操作内容', {
}, prop: "ip",
label: "操作IP地址",
align: "center",
{ },
prop: 'ip',
label: '操作IP地址', {
}, prop: "logDate",
label: "操作时间",
align: "center",
{ formatter: this.formatterDate,
prop: 'logDate', },
label: '操作时间', ],
width: 140, },
formatter: this.formatterDate };
}, },
};
{
label: '操作',
width: 100,
formatter: (row)=> {
return (
<table-buttons row={row} onDel={this.toDel} noEdit/>
)
},
},
],
},
}
}
}
</script> </script>
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' notPagination /> <LayoutTable :data='tableData' :config='tableConfig' notPagination />
<dialog-show ref="dialogform" @ok="getData" />
</div> </div>
</template> </template>
<script> <script>
import table from '@/assets/mixins/table'; import table from '@/assets/mixins/table';
import dialogShow from "./dialogshow";
export default { export default {
mixins: [table], mixins: [table],
components: { dialogShow },
methods: { methods: {
beforeRender(data) { beforeRender(data) {
this.allMenu = this.sortByGroup(this.util_copy(data.result)); this.allMenu = this.sortByGroup(this.util_copy(data.result));
...@@ -67,6 +70,49 @@ export default { ...@@ -67,6 +70,49 @@ export default {
statusChange() { statusChange() {
this.$store.dispatch('login'); this.$store.dispatch('login');
}, },
handleUp(data) {
let type = 0;
let url = "/menu/upOrDown";
this.switchSort(url, data.id, type);
},
handleDown(data) {
let type = 1;
let url = "/menu/upOrDown";
this.switchSort(url, data.id, type);
},
switchSort(url, id, type) {
this.loading = true;
this.$post(url, {
id: id,
type: type,
})
.then((res) => {
if (res && res.code && res.code == 1) {
this.getData()
this.loading = false;
this.$message.success("更新排序成功!");
}
})
.catch((error) => {
this.loading = false;
this.$message.error(error.message);
});
},
/** 重写新增方法 */
toAdd(row) {
this.$refs.dialogform.add(row);
},
/** 重写编辑方法 */
toEdit(row) {
this.$refs.dialogform.edit(row);
},
/** 重写查看方法 */
toView(row) {
this.$refs.dialogform.view(row);
},
}, },
data() { data() {
return { return {
...@@ -114,8 +160,8 @@ export default { ...@@ -114,8 +160,8 @@ export default {
{ {
prop: 'imgPath', prop: 'imgPath',
label: '图标', label: '图标',
width: 50, width: 120,
formatter: this.showIcon, // formatter: this.showIcon,
}, },
{ {
prop: 'authType', prop: 'authType',
...@@ -140,7 +186,28 @@ export default { ...@@ -140,7 +186,28 @@ export default {
label: '操作', label: '操作',
formatter: (row)=> { formatter: (row)=> {
return ( return (
<table-buttons row={row} onEdit={this.toEdit} onDel={this.toDel} />
<div>
<el-link
style="margin-right:5px;margin-left:5px"
icon="el-icon-top"
onClick={() => {
this.handleUp(row);
}}
></el-link>
<el-link
style="margin-right:5px;margin-left:5px"
icon="el-icon-bottom"
onClick={() => {
this.handleDown(row);
}}
></el-link>
<table-buttons noView row={row} onEdit={this.toEdit} onDel={this.toDel} />
</div>
) )
}, },
}, },
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' /> <LayoutTable :data='tableData' :config='tableConfig' />
<dialog-show ref="dialogform" @ok="getData" />
</div> </div>
</template> </template>
<script> <script>
import table from '@/assets/mixins/table'; import table from '@/assets/mixins/table';
import dialogShow from "./dialogshow";
export default { export default {
mixins: [table], mixins: [table],
components: {dialogShow },
methods: {
// 新增
toAdd(row) {
this.$refs.dialogform.add(row);
},
// 编辑
toEdit(row) {
this.$refs.dialogform.edit(row);
},
// 查看
toView(row,) {
this.$refs.dialogform.view(row);
},
},
data() { data() {
return { return {
config: { config: {
...@@ -69,10 +86,10 @@ export default { ...@@ -69,10 +86,10 @@ export default {
}, },
{ {
label: '操作', label: '操作',
width: 180, width: 260,
formatter: (row)=> { formatter: (row)=> {
return ( return (
<table-buttons row={row} onEdit={this.toEdit} onView={this.toView} onDel={this.toDel} /> <table-buttons noAdd row={row} onEdit={this.toEdit} onView={this.toView} onDel={this.toDel} />
) )
}, },
}, },
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' /> <LayoutTable :data='tableData' :config='tableConfig' />
<dialog-show ref="dialogform" @ok="getData" />
</div> </div>
</template> </template>
<script> <script>
import table from '@/assets/mixins/table'; import table from '@/assets/mixins/table';
import dialogShow from "./dialogshow";
export default { export default {
mixins: [table], mixins: [table],
components: {
dialogShow,
},
methods: {
/** 重写新增方法 */
toAdd(row) {
this.$refs.dialogform.add(row);
},
/** 重写编辑方法 */
toEdit(row) {
console.log(22222222)
this.$refs.dialogform.edit(row);
},
/** 重写查看方法 */
toView(row) {
this.$refs.dialogform.view(row);
},
},
data() { data() {
return { return {
config: { config: {
...@@ -29,11 +48,10 @@ export default { ...@@ -29,11 +48,10 @@ export default {
type: 'selection', type: 'selection',
width: 60, width: 60,
}, },
{ // {
prop: 'id', // prop: 'id',
label: 'ID', // label: 'ID',
width: 60, // },
},
{ {
prop: 'name', prop: 'name',
label: '名称', label: '名称',
...@@ -45,7 +63,7 @@ export default { ...@@ -45,7 +63,7 @@ export default {
{ {
prop: 'authType', prop: 'authType',
label: '认证类型', label: '认证类型',
width: 140,
formatter: this.formatter, formatter: this.formatter,
}, },
// { // {
...@@ -59,7 +77,7 @@ export default { ...@@ -59,7 +77,7 @@ export default {
witdh: 120, witdh: 120,
formatter: (row)=> { formatter: (row)=> {
return ( return (
<table-buttons row={row} onEdit={this.toEdit} onDel={this.toDel} /> <table-buttons noView row={row} onEdit={this.toEdit} onDel={this.toDel} />
) )
}, },
}, },
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' /> <LayoutTable :data="tableData" :config="tableConfig" />
<dialog-show ref="dialogform" @ok="getData" />
</div> </div>
</template> </template>
<script> <script>
import table from '@/assets/mixins/table'; import table from "@/assets/mixins/table";
import dialogShow from "./dialogshow";
export default { export default {
mixins: [table], mixins: [table],
components: { dialogShow },
methods: {
/** 重写新增方法 */
toAdd(row) {
this.$refs.dialogform.add(row);
},
/** 重写编辑方法 */
toEdit(row) {
this.$refs.dialogform.edit(row);
},
/** 重写查看方法 */
toView(row) {
this.$refs.dialogform.view(row);
},
},
data() { data() {
return { return {
config: { config: {
search: [ search: [
{ {
name: 'name', name: "name",
type: 'text', type: "text",
label: '任务名称', label: "任务名称",
}, },
{ {
name: 'status', name: "status",
type: 'select', type: "select",
label: '任务状态', label: "任务状态",
}, },
], ],
columns: [ columns: [
{ {
type: 'selection', type: "selection",
width: 60, width: 60,
align: "center",
}, },
{ {
prop: 'name', prop: "name",
label: '任务名称', label: "任务名称",
align: "center",
}, },
{ {
prop: 'excuteHost', prop: "excuteHost",
label: '执行主机', label: "执行主机",
align: "center",
}, },
{ {
prop: 'excuteContainer', prop: "excuteContainer",
label: '执行容器', label: "执行容器",
align: "center",
}, },
{ {
prop: 'excuteStrategy', prop: "excuteStrategy",
label: '执行策略', label: "执行策略",
formatter: this.formatter align: "center",
formatter: this.formatter,
}, },
{ {
prop: 'lastExcuteHost', prop: "lastExcuteHost",
label: '最后执行主机', label: "最后执行主机",
align: "center",
}, },
{ {
prop: 'lastExcuteTime', prop: "lastExcuteTime",
label: '最后执行时间', label: "最后执行时间",
formatter: this.formatterDate align: "center",
formatter: this.formatterDate,
}, },
{ {
prop: 'status', prop: "status",
label: '执行状态', align: "center",
formatter: this.formatter label: "执行状态",
formatter: this.formatter,
}, },
{ {
label: '操作', label: "操作",
align: "center",
width: 180, width: 180,
formatter: (row)=> { formatter: (row) => {
return ( return (
<table-buttons row={row} onEdit={this.toEdit} onDel={this.toDel} /> <table-buttons
) noView
row={row}
onEdit={this.toEdit}
onDel={this.toDel}
/>
);
}, },
}, },
], ],
}, },
} };
} },
} };
</script> </script>
...@@ -8,15 +8,8 @@ ...@@ -8,15 +8,8 @@
label-width='120px' label-width='120px'
ref="form" ref="form"
> >
<el-row> <el-row>
<Field label="任务名称" prop="name" v-model="form.name"/> <Field label="任务名称" prop="name" v-model="form.name"/>
<Field label="关键字" prop="taskKey" v-model="form.taskKey"/> <Field label="关键字" prop="taskKey" v-model="form.taskKey"/>
<Field label="执行服务" prop="excuteService" v-model="form.excuteService" :enumData='dict.excuteService' type='select' /> <Field label="执行服务" prop="excuteService" v-model="form.excuteService" :enumData='dict.excuteService' type='select' />
......
...@@ -2,94 +2,146 @@ ...@@ -2,94 +2,146 @@
<template> <template>
<layout-form> <layout-form>
<el-form <el-form
:model="form" :model="form"
:loading="loading" :loading="loading"
:rules="rules" :rules="rules"
size='small' size="small"
label-width='100px' label-width="100px"
ref="form" ref="form"
> >
<el-row> <el-row>
<Field label="登录名称" prop="loginName" v-model="form.loginName" /> <Field label="登录名称" prop="loginName" v-model="form.loginName" />
<Field label="登录密码" prop="loginPwd" v-model="form.loginPwd" v-if='pageInfo.type === "add"' /> <Field
label="登录密码"
prop="loginPwd"
v-model="form.loginPwd"
v-if="pageInfo.type === 'add'"
/>
<Field label="用户昵称" prop="realName" v-model="form.realName" /> <Field label="用户昵称" prop="realName" v-model="form.realName" />
<Field label="手机号码" prop="mobile" v-model="form.mobile" /> <Field label="手机号码" prop="mobile" v-model="form.mobile" />
<Field label="用户类型" prop="userType" v-model="form.userType" :enumData='dict.userType' type='select' /> <Field
<Field label="用户状态" prop="status" v-model="form.status" :enumData='dict.status' type='select' /> label="用户类型"
<el-col :span="12"> prop="userType"
<el-form-item label="归属站点" prop="siteId"> v-model="form.userType"
<treeselect v-model="form.siteId" :options="siteOptions" :show-count="true" placeholder="请选择归属站点" /> :enumData="dict.userType"
</el-form-item> type="select"
</el-col> />
<Field
label="用户状态"
prop="status"
v-model="form.status"
:enumData="dict.status"
type="select"
/>
</el-row> </el-row>
<form-buttons @submit='submitForm'/> <form-buttons @submit="submitForm" />
</el-form> </el-form>
</layout-form> </layout-form>
</template> </template>
<script> <script>
import form from '@/assets/mixins/form'; import form from "@/assets/mixins/form";
import Treeselect from "@riophae/vue-treeselect"; import Treeselect from "@riophae/vue-treeselect";
import "@riophae/vue-treeselect/dist/vue-treeselect.css"; import "@riophae/vue-treeselect/dist/vue-treeselect.css";
export default { export default {
mixins: [form], mixins: [form],
name: "User", name: "User",
components: { Treeselect }, components: { Treeselect },
created() { created() {},
//this.getList();
this.getTreeselect();
},
methods: { methods: {
/** 查询部门下拉树结构 */ /** 编辑 */
getTreeselect() { edit(row) {
this.reset();
this.loading = true; this.query = { id: row.id };
this.$post("/site/treeselect", {}) this.urls.currUrl = this.pageInfo.editUrl;
.then((res) => { this.getData();
if (res && res.code && res.code == 1) { this.pageInfo.type = "edit";
this.siteOptions = res.data.result; this.open = true;
this.loading = false; this.title = "修改设备";
} },
}) /** 新增 */
.catch((error) => { add(row) {
this.$message.error(error.message); this.reset();
}); this.query = { id: row.id };
this.urls.currUrl = this.pageInfo.addUrl;
this.getData();
this.pageInfo.type = "add";
this.open = true;
this.title = "新增设备";
},
/** 查看*/
view(row) {
this.reset();
this.query = { id: row.id };
this.urls.currUrl = this.pageInfo.viewUrl;
this.getData();
this.pageInfo.type = "view";
this.open = true;
this.title = "设备详细";
},
/**取消按钮 */
cancel() {
this.open = false;
},
afterSubmit(data) {
this.open = false;
this.$emit("ok");
},
// 表单重置
reset() {
this.resetForm("form");
}, },
}, },
data() { data() {
return { return {
toString: ['status', 'userType'], toString: ["status", "userType"],
siteOptions:[], siteOptions: [],
rules: { rules: {
loginName: [ deviceName: [
{ required: true, message: '请输入登录名称', trigger: 'blur' }, {
required: true,
message: "请输入设备名称",
trigger: "blur",
},
{ max: 20, message: "最多只能录入20个字符", trigger: "blur" },
],
deviceType: [
{ required: true, message: "请选择设备类型", trigger: "change" },
], ],
loginPwd: [ deviceMac: [
{ required: true, message: '请输入登录密码', trigger: 'blur' }, { required: true, message: "请输入Mac地址", trigger: "blur" },
], ],
realName: [ deviceFirmId: [
{ required: true, message: '请输入用户昵称', trigger: 'blur' }, { required: true, message: "请选择设备生产商", trigger: "change" },
], ],
mobile: [
{ required: true, message: '请输入手机号码', trigger: 'blur' }, deviceFirmname: [
{ required: true, validator: (rule, val, cb)=>{ {
if(!/^1[0-9]{10}$/.test(val)){ required: true,
return cb(new Error('手机号码格式不正确')) message: "请输入设备生产厂商名称关联mortals_xhx_stp_firm",
} trigger: "blur",
cb(); },
}, trigger: 'blur' }, { max: 20, message: "最多只能录入20个字符", trigger: "blur" },
],
deviceToRoomName: [
{
required: true,
message: "请输入设备所属房间名称",
trigger: "blur",
},
{ max: 128, message: "最多只能录入128个字符", trigger: "blur" },
], ],
userType: [ deviceOnlineStatus: [
{ required: true, message: '请选择用户类型', trigger: 'blur' }, { required: true, message: "请选择在线状态 ", trigger: "change" },
], ],
status: [ deviceStatus: [
{ required: true, message: '请选择用户状态', trigger: 'blur' }, { required: true, message: "请选择启用状态 ", trigger: "change" },
], ],
createTime: [{ required: true, message: "请选择创建时间" }],
}, },
} };
} },
} };
</script> </script>
......
<template> <template>
<div class="page"> <div class="page">
<LayoutTable :data='tableData' :config='tableConfig' notAdd notDel/> <LayoutTable :data='tableData' :config='tableConfig' notAdd notDel />
</div> </div>
</template> </template>
......
...@@ -16,7 +16,6 @@ import org.springframework.context.annotation.ImportResource; ...@@ -16,7 +16,6 @@ import org.springframework.context.annotation.ImportResource;
@ImportResource(locations = {"classpath:config/spring-config.xml"}) @ImportResource(locations = {"classpath:config/spring-config.xml"})
public class ManagerApplication extends BaseWebApplication { public class ManagerApplication extends BaseWebApplication {
@Bean @Bean
public ICacheService cacheService() { public ICacheService cacheService() {
return new LocalCacheServiceImpl(); return new LocalCacheServiceImpl();
......
...@@ -527,18 +527,18 @@ public class UserEntity extends UserEntityExt implements IUser { ...@@ -527,18 +527,18 @@ public class UserEntity extends UserEntityExt implements IUser {
} }
public void initAttrValue(){ public void initAttrValue(){
this.loginName = null; this.loginName = "";
this.loginPwd = null; this.loginPwd = "";
this.loginPwd1 = null; this.loginPwd1 = null;
this.loginPwd2 = null; this.loginPwd2 = null;
this.loginPwd3 = null; this.loginPwd3 = null;
this.loginLimitAddress = null; this.loginLimitAddress = null;
this.realName = null; this.realName = "";
this.mobile = null; this.mobile = "";
this.phone = null; this.phone = null;
this.email = null; this.email = null;
this.qq = null; this.qq = null;
this.userType = null; this.userType = 1;
this.siteId = null; this.siteId = null;
this.status = 1; this.status = 1;
this.customerId = null; this.customerId = null;
......
...@@ -4,11 +4,15 @@ import com.mortals.framework.model.BaseEntityLong; ...@@ -4,11 +4,15 @@ import com.mortals.framework.model.BaseEntityLong;
import lombok.Data; import lombok.Data;
/** /**
* * Description:User
* Description:User * date: 2021-9-26 16:11:48
* date: 2021-9-26 16:11:48 */
*/
@Data @Data
public class UserEntityExt extends BaseEntityLong { public class UserEntityExt extends BaseEntityLong {
private String siteName; private String siteName;
private String roleIds;
private String roleNames;
} }
\ No newline at end of file
...@@ -20,14 +20,6 @@ public final class Constant { ...@@ -20,14 +20,6 @@ public final class Constant {
/** 基础代码版本 Z-BASE.MANAGER-S1.0.0 */ /** 基础代码版本 Z-BASE.MANAGER-S1.0.0 */
public final static String BASEMANAGER_VERSION = "Z-BASE.MANAGER-S1.0.0"; public final static String BASEMANAGER_VERSION = "Z-BASE.MANAGER-S1.0.0";
public final static String Param_materialProperty = "materialProperty"; public final static String PARAM_SERVER_HTTP_URL = "server_http_url";
public final static String Param_materialType = "materialType";
public final static String Param_isMust = "isMust";
public final static String Param_electronicgs = "electronicgs";
public final static String Param_materialSource = "materialSource";
public final static String Param_paperGg = "paperGg";
public final static String Param_jianmMs = "jianmMs";
public final static String Param_sealWay = "sealWay";
public final static String Param_typeOptions = "typeOptions";
} }
...@@ -6,19 +6,6 @@ import org.springframework.stereotype.Component; ...@@ -6,19 +6,6 @@ import org.springframework.stereotype.Component;
import com.mortals.framework.springcloud.service.IApplicationService; import com.mortals.framework.springcloud.service.IApplicationService;
/**
* 应用级服务,在应用启动、停止过程中调用
*
* 缺陷:类加载完成后就调用,会由于某些组件还未初始化而导致服务异常,
* 比如Kafka的连接以及订阅初始化比较靠后,在服务启动过程中就调用操作kafka相关API,将导致失败
* 比如开启Socket监听端口,可能端口都接收到连接请求了,但数据库连接还未初始化完成,导致请求处理失败
* 比如定时任务,任务执行时,相关缓存还未初始化,导致处理失败
*
* 应用场景:
* 1、无依赖其它模块或框架的数据初始化等操作
* @author GM
* @date 2020年7月15日
*/
@Component @Component
public class DemoStartService implements IApplicationService { public class DemoStartService implements IApplicationService {
......
package com.mortals.xhx.daemon.applicationservice; package com.mortals.xhx.daemon.applicationservice;
import com.mortals.framework.springcloud.config.web.BaseWebMvcConfigurer; import com.mortals.framework.springcloud.service.IApplicationStartedService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.mortals.framework.springcloud.service.IApplicationStartedService;
/**
* 应用级服务,在应用启动后、停止过程中调用
* 应用已经完成启动完成,才调用该服务
* 应用场景:
* 1、应用任务,应用启动后定时或间隔执行的任务
* 2、Socket服务端
* @author GM
* @date 2020年7月15日
*/
@Component @Component
//@ConditionalOnProperty(name="com.mortal",prefix = "",havingValue = "xxx") //@ConditionalOnProperty(name="com.mortal",prefix = "",havingValue = "xxx")
public class DemoStartedService implements IApplicationStartedService { public class DemoStartedService implements IApplicationStartedService {
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment