Commit 6b8fd9bc authored by 赵啸非's avatar 赵啸非

修改配置文件

parent d77ca5b1
......@@ -193,13 +193,9 @@
<artifactId>poi-tl</artifactId>
<version>1.12.0</version>
</dependency>
<!--
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.3.18</version>
<scope>provided</scope>
</dependency>-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
......@@ -217,7 +213,11 @@
<version>2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
......
package com.mortals.xhx.base.system.message.impl;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.mortals.framework.service.ICacheService;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.busiz.rsp.ApiResp;
import com.mortals.xhx.webflux.req.RobotTransReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.Delta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
......
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
/**
* @author karlhoo
*/
@Data
public class ApiResp<T> {
/**
* 结果编码
*/
private int code;
/**
* 结果描述
*/
private String msg;
/**
* 响应数据
*/
private T data;
}
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class AudioItem{
private String src;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class ButtonItem{
private String link;
private String text;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class CustomItem{
private String type;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import java.util.List;
import lombok.Data;
@Data
public class Feed{
private List<ButtonItem> button;
private List<ImageItem> image;
private List<RelateItem> relate;
private String confidence;
private List<CustomItem> custom;
private String source;
private List<VideoItem> video;
private List<AudioItem> audio;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class ImageItem{
private String src;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class NlpItem{
private Feed feed;
private Slots slots;
private String agent;
private String domain;
private String action;
private String source;
private String intent;
private String englishDomain;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class RelateItem{
private String score;
private String src;
private String text;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import java.util.List;
import lombok.Data;
@Data
public class RobotTransResp{
private List<NlpItem> nlp;
private String query;
private String sn;
private int status;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class Slots{
private String answer;
}
\ No newline at end of file
package com.mortals.xhx.busiz.rsp;
import lombok.Data;
@Data
public class VideoItem{
private String src;
}
\ No newline at end of file
package com.mortals.xhx.busiz.web;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mortals.framework.annotation.UnAuth;
import com.mortals.framework.exception.AppException;
import com.mortals.framework.service.ILogService;
import com.mortals.framework.service.impl.FileLogServiceImpl;
import com.mortals.xhx.base.system.message.MessageService;
import com.mortals.xhx.base.system.upload.service.UploadService;
import com.mortals.xhx.common.utils.IatModelMulMain;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okio.BufferedSource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
@RequestMapping("api")
@RequestMapping("audio")
public class ApiSendMsgController {
@Autowired
protected ILogService logService = FileLogServiceImpl.getInstance();
@Autowired
private MessageService messageService;
private UploadService uploadService;
private String hostUrl = "https://iat.cn-huabei-1.xf-yun.com/v1"; // 注意多语种识别,也支持中文音频
private String appid = "3cc52607"; //在控制台-我的应用获取
private String apiSecret = "ZTdmMjFjMGYxYmJhN2VmYjFlMTg3N2Rk"; // 在控制台-我的应用获取
private String apiKey = "d0f73d44e996c2da9924c4476c578a30"; // 在控制台-我的应用获取
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEvents() {
......@@ -40,156 +47,100 @@ public class ApiSendMsgController {
.map(i -> "Event " + i);
}
@GetMapping("/emitter")
@UnAuth
public SseEmitter test() throws IOException, InterruptedException {
// filters.forEach(filter -> System.out.println("Loaded filter: " + filter.getClass().getName()));
//每3s向前端推送消息
SseEmitter emitter = new SseEmitter(30000L);
for (int i = 0; i < 50; i++) {
log.info("num:{}", i);
emitter.send("后端推送信息" + i);
@GetMapping(value = "/events/create", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEventsCreate() {
/* return Flux.push(sink -> {
new Thread(() -> {
try {
for (int i = 1; i <= 1; i++) {
sink.next("Message " + i);
Thread.sleep(1000);
}
emitter.complete();
return emitter;
}
/* *//**
* 接收机器人请求数据
*
* @param req
* @return
*//*
@PostMapping(value = "trans", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@UnAuth
public SseEmitter trans(@RequestBody RobotTransReq req) {
log.info("【接收机器人请求数据】【请求体】--> " + JSONObject.toJSONString(req));
//todo 业务处理 转发请求
RobotTransResp rsp = new RobotTransResp();
// 超时时间设置为30s,
SseEmitter sseEmitter = new SseEmitter(30000L);
messageService.sendMessageToFastApi(sseEmitter, req);
return sseEmitter;
// return messageService.sendMessageToFastApi(sseEmitter, req);
// 设置前端的重试时间为1s
*//* sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));*//*
// log.info("响应【接收机器人请求数据】【响应体】--> " + JSONObject.toJSONString(rsp));
// return JSON.toJSONString(rsp);
sink.complete();
} catch (InterruptedException e) {
sink.error(e);
}
*/
}).start();
});*/
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build();
/* return Flux.create(sink -> {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
sink.next("Message " + System.currentTimeMillis());
}, 0, 1, TimeUnit.SECONDS);
});*/
// 封装请求头
Headers headers = new Headers.Builder()
.set("Content-Type", "application/json")
.set("Accept", "text/event-stream")
.build();
Request request = new Request.Builder()
.url("http://localhost:18006/api/emitter")
.headers(headers)
.get()
.build();
return Flux.create(sink -> {
for (int i = 1; i <= 10; i++) {
new Thread(() -> { // 需要异步执行,否则会阻塞
sink.next("Message ");
// 每秒发送一条数据
Call call = client.newCall(request);
}).start();
// 4. 监听回调
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
log.error("进入 onFailure方法 {}", e.getMessage(), e);
// sseEmitter.completeWithError(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (response.isSuccessful()) {
String chunkMessage = "";
BufferedSource source = response.body().source();
while (!source.exhausted()) {
chunkMessage = source.readUtf8Line();
if (StringUtils.isBlank(chunkMessage)) {
continue;
}
/* JSONObject jsonObject = JSONObject.parseObject(chunkMessage);
if (null != jsonObject && null != jsonObject.getJSONObject("data")) {
String answer = jsonObject.getJSONObject("data").getString("answer");
// sseEmitter.send(answer);
}*/
sink.complete(); // 结束流
});
log.info("chunkMessage:{}", chunkMessage);
//return null;
}
} else {
// log.error("onResponse 方法请求失败 {}", response.message());
// TODO 重新发起请求
@GetMapping("/emitter")
@UnAuth
public SseEmitter test() throws IOException, InterruptedException {
// filters.forEach(filter -> System.out.println("Loaded filter: " + filter.getClass().getName()));
//每3s向前端推送消息
SseEmitter emitter = new SseEmitter(30000L);
for (int i = 0; i < 50; i++) {
log.info("num:{}", i);
emitter.send("后端推送信息" + i);
Thread.sleep(1000);
}
emitter.complete();
return emitter;
}
});
@RequestMapping(value = "upload")
public String doFileUpload(MultipartFile file, @RequestParam(value = "prePath", defaultValue = "") String prePath) {
Map<String, Object> model = new HashMap<>();
String jsonStr = "";
try {
log.info("22222222222222222");
String filePath = uploadService.saveFileUpload(file, prePath, null);
//将音频文件传给ai大模型
SseEmitter sseEmitter = new SseEmitter(30000L);
// IatModelMulMain.getAuthUrl()
WebClient webClient = WebClient.builder().baseUrl("http://localhost:18006").build();
// model.put("url", filePath);
model.put("fileName", file.getOriginalFilename());
// model.put(KEY_RESULT_CODE, VALUE_RESULT_SUCCESS);
jsonStr = JSONObject.toJSONString(model);
} catch (AppException e) {
/* log.debug(e);
model.put(KEY_RESULT_CODE, VALUE_RESULT_FAILURE);
model.put(KEY_RESULT_MSG, e.getMessage());*/
jsonStr = JSONObject.toJSONString(model);
} catch (Exception e) {
/* // 封装请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
//headers.set(authHead, authValue);
// headers.setBasicAuth("Authorization","fastgpt-jZqZqjWQOic82RbTFdzAu94ddX0YL0ZWfBUtYAcVtewIkXe0yUfH");
// accept 一定要设置为 TEXT_EVENT_STREAM
headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
Flux<String> eventStream = webClient
.get()
.uri("/api/emitter")
.accept(MediaType.valueOf("text/event-stream;charset=UTF-8")) // 一定要设置
.retrieve()
.bodyToFlux(String.class);
// eventStream.flatMap(data -> {}).collect()
eventStream.subscribe(
data -> { // data 有什么key,value。具体看你对接agent的文档
log.info("data:{}", data);
},
error -> {
sseEmitter.completeWithError(error);
log.error("报错了:{}", error.getMessage(), error);
},
sseEmitter::complete
);*/
/* model.put(KEY_RESULT_CODE, VALUE_RESULT_FAILURE);
model.put(KEY_RESULT_MSG, "文件上传失败");*/
jsonStr = JSONObject.toJSONString(model);
}
return jsonStr;
}
}
package com.mortals.xhx.common.utils;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 大模型语音识别---方言识别
*/
@Slf4j
public class IatModelMulMain extends WebSocketListener {
private static final String hostUrl = "https://iat.cn-huabei-1.xf-yun.com/v1"; // 注意多语种识别,也支持中文音频
private static final String appid = "3cc52607"; //在控制台-我的应用获取
private static final String apiSecret = "ZTdmMjFjMGYxYmJhN2VmYjFlMTg3N2Rk"; // 在控制台-我的应用获取
private static final String apiKey = "d0f73d44e996c2da9924c4476c578a30"; // 在控制台-我的应用获取
private static final String file = "E://test1.mp3"; // 识别音频位置
public static final int StatusFirstFrame = 0;
public static final int StatusContinueFrame = 1;
public static final int StatusLastFrame = 2;
public static final Gson gson = new Gson();
// 开始时间
private static Date dateBegin = new Date();
// 结束时间
private static Date dateEnd = new Date();
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyy-MM-dd HH:mm:ss.SSS");
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
new Thread(() -> {
//连接成功,开始发送数据
int frameSize = 1280; //每一帧音频的大小,建议每 40ms 发送 122B
int intervel = 40;
int status = 0; // 音频的状态
int seq = 0; //数据序号
try (FileInputStream fs = new FileInputStream(file)) {
byte[] buffer = new byte[frameSize];
// 发送音频
end:
while (true) {
seq++; // 每次循环更新下seq
int len = fs.read(buffer);
if (len == -1) {
status = StatusLastFrame; //文件读完,改变status 为 2
}
switch (status) {
case StatusFirstFrame: // 第一帧音频status = 0
String json = "{\n" +
" \"header\": {\n" +
" \"app_id\": \"" + appid + "\",\n" +
" \"status\": " + StatusFirstFrame + "\n" +
" },\n" +
" \"parameter\": {\n" +
" \"iat\": {\n" +
" \"domain\": \"slm\",\n" +
" \"language\": \"zh_cn\",\n" +
" \"accent\": \"mulacc\",\n" +
" \"eos\": 6000,\n" +
" \"vinfo\": 1,\n" +
" \"result\": {\n" +
" \"encoding\": \"utf8\",\n" +
" \"compress\": \"raw\",\n" +
" \"format\": \"json\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"payload\": {\n" +
" \"audio\": {\n" +
" \"encoding\": \"raw\",\n" +
" \"sample_rate\": 16000,\n" +
" \"channels\": 1,\n" +
" \"bit_depth\": 16,\n" +
" \"seq\": " + seq + ",\n" +
" \"status\": 0,\n" +
" \"audio\": \"" + Base64.getEncoder().encodeToString(Arrays.copyOf(buffer, len)) + "\"\n" +
" }\n" +
" }\n" +
"}";
webSocket.send(json);
System.err.println(json);
System.out.println("第一帧音频发送完毕...");
System.out.println("中间音频将持续发送...");
status = StatusContinueFrame; // 发送完第一帧改变status 为 1
break;
case StatusContinueFrame: //中间帧status = 1
json = "{\n" +
" \"header\": {\n" +
" \"app_id\": \"" + appid + "\",\n" +
" \"status\": 1\n" +
" },\n" +
" \"payload\": {\n" +
" \"audio\": {\n" +
" \"encoding\": \"raw\",\n" +
" \"sample_rate\": 16000,\n" +
" \"channels\": 1,\n" +
" \"bit_depth\": 16,\n" +
" \"seq\": " + seq + ",\n" +
" \"status\": 1,\n" +
" \"audio\": \"" + Base64.getEncoder().encodeToString(Arrays.copyOf(buffer, len)) + "\"\n" +
" }\n" +
" }\n" +
"}";
webSocket.send(json);
// System.err.println(json);
System.out.println("中间帧音频发送中..."+seq);
break;
case StatusLastFrame: // 最后一帧音频status = 2 ,标志音频发送结束
json = "{\n" +
" \"header\": {\n" +
" \"app_id\": \"" + appid + "\",\n" +
" \"status\": 2\n" +
" },\n" +
" \"payload\": {\n" +
" \"audio\": {\n" +
" \"encoding\": \"raw\",\n" +
" \"sample_rate\": 16000,\n" +
" \"channels\": 1,\n" +
" \"bit_depth\": 16,\n" +
" \"seq\": " + seq + ",\n" +
" \"status\": 2,\n" +
" \"audio\": \"\"\n" +
" }\n" +
" }\n" +
"}";
webSocket.send(json);
// System.err.println(json);
System.out.println("最后一帧音频发送完毕...");
break end;
}
Thread.sleep(intervel); //模拟音频采样延时
}
System.out.println("所有音频发送完毕...");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
System.out.println(text);
JsonParse jsonParse = gson.fromJson(text, JsonParse.class);
if (jsonParse != null) {
if (jsonParse.header.code != 0) {
System.out.println("code=>" + jsonParse.header.code + " error=>" + jsonParse.header.message + " sid=" + jsonParse.header.sid);
System.out.println("错误码查询链接:https://www.xfyun.cn/document/error-code");
return;
}
if (jsonParse.payload != null) {
if (jsonParse.payload.result.text != null) { // 中间结果
byte[] decodedBytes = Base64.getDecoder().decode(jsonParse.payload.result.text);
String decodeRes = new String(decodedBytes, StandardCharsets.UTF_8);
// System.out.println("中间识别结果 ==》" + decodeRes);
JsonParseText jsonParseText = gson.fromJson(decodeRes, JsonParseText.class);
List<Ws> wsList = jsonParseText.ws;
System.out.print("中间识别结果==》");
for (Ws ws : wsList) {
List<Cw> cwList = ws.cw;
for (Cw cw : cwList) {
System.out.print(cw.w);
}
}
System.out.println();
}
if (jsonParse.payload.result.status == 2) { // 最终结果 说明数据全部返回完毕,可以关闭连接,释放资源
System.out.println("session end ");
dateEnd = new Date();
System.out.println(sdf.format(dateBegin) + "开始");
System.out.println(sdf.format(dateEnd) + "结束");
System.out.println("耗时:" + (dateEnd.getTime() - dateBegin.getTime()) + "ms");
// System.out.println("最终识别结果 ==》" + decodeRes); // 按照规则替换与追加出最终识别结果
System.out.println();
System.out.println("本次识别sid ==》" + jsonParse.header.sid);
webSocket.close(1000, "");
}
}
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
try {
if (null != response) {
int code = response.code();
System.out.println("onFailure code:" + code);
System.out.println("onFailure body:" + response.body().string());
if (101 != code) {
System.out.println("connection failed");
System.exit(0);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// 构建鉴权url
String authUrl = getAuthUrl(hostUrl, apiKey, apiSecret);
log.info("authUrl==>" + authUrl);
OkHttpClient client = new OkHttpClient.Builder().build();
//将url中的 schema http://和https://分别替换为ws:// 和 wss://
String url = authUrl.toString().replace("http://", "ws://").replace("https://", "wss://");
//System.out.println(url);
Request request = new Request.Builder().url(url).build();
// System.out.println(client.newCall(request).execute());
//System.out.println("url===>" + url);
WebSocket webSocket = client.newWebSocket(request, new IatModelMulMain());
}
public static String getAuthUrl(String hostUrl, String apiKey, String apiSecret) throws Exception {
URL url = new URL(hostUrl);
SimpleDateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
format.setTimeZone(TimeZone.getTimeZone("GMT"));
String date = format.format(new Date());
StringBuilder builder = new StringBuilder("host: ").append(url.getHost()).append("\n").//
append("date: ").append(date).append("\n").//
append("GET ").append(url.getPath()).append(" HTTP/1.1");
// System.out.println(builder);
Charset charset = Charset.forName("UTF-8");
Mac mac = Mac.getInstance("hmacsha256");
SecretKeySpec spec = new SecretKeySpec(apiSecret.getBytes(charset), "hmacsha256");
mac.init(spec);
byte[] hexDigits = mac.doFinal(builder.toString().getBytes(charset));
String sha = Base64.getEncoder().encodeToString(hexDigits);
//System.out.println(sha);
String authorization = String.format("api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"", apiKey, "hmac-sha256", "host date request-line", sha);
//System.out.println(authorization);
HttpUrl httpUrl = HttpUrl.parse("https://" + url.getHost() + url.getPath()).newBuilder().//
addQueryParameter("authorization", Base64.getEncoder().encodeToString(authorization.getBytes(charset))).//
addQueryParameter("date", date).//
addQueryParameter("host", url.getHost()).//
build();
return httpUrl.toString();
}
// 返回结果拆分与展示,仅供参考
// 返回结果拆分与展示,仅供参考
class JsonParse {
Header header;
Payload payload;
}
class Header {
int code;
String message;
String sid;
int status;
}
class Payload {
Result result;
}
class Result {
String text;
int status;
}
class JsonParseText {
List<Ws> ws;
String pgs;
List<Integer> rg;
}
class Ws {
List<Cw> cw;
}
class Cw {
String w;
}
}
package com.mortals.xhx.thread;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mortals.xhx.webflux.factory.FlowChatContext;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
/**
* 机器人响应消息
*
* @author: zxfei
* @date: 2025/3/19 20:36
*/
@AllArgsConstructor
@Slf4j
public class ChatMsgRspThread implements Runnable {
private FlowChatContext context;
/**
* 最大重试次数
*/
public final Integer MAX_RETRY_COUNT = 10;
@Override
public void run() {
log.info("快速响应线程开始,msgId:{}", context.getMsgId());
StringBuilder answer = context.getAnswer();
int reCount = 0;
while (reCount < MAX_RETRY_COUNT) {
if (!answer.toString().isEmpty()) {
log.info("快速响应线程结束 answer:{}", answer);
break;
}
//创建一个初始化的answer
JSONObject rspJson = new JSONObject();
rspJson.put("id", context.getMsgId());
rspJson.put("created", new Date().getTime());
rspJson.put("model", "");
JSONArray jsonArray = new JSONArray();
JSONObject obj = new JSONObject();
JSONObject delta = new JSONObject();
delta.put("role", "");
if(reCount == 0){
delta.put("role", "assistant");
}
delta.put("content", "");
obj.put("delta", delta);
obj.put("index", 0);
obj.put("finish_reason", "null");
jsonArray.add(obj);
rspJson.put("choices", jsonArray);
rspJson.put("object", "");
context.getEmitter().next(rspJson.toJSONString());
reCount++;
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
}
}
}
}
......@@ -56,12 +56,40 @@ public class FlowChatSubscriber implements Subscriber<String>, Disposable {
@Override
public void onNext(String data) {
String rsp = "";
try {
rsp = callBack.onNext(data, context);
if (ObjectUtils.isEmpty(rsp)) {
this.stopFlag = true;
}
// log.info(" onNext rsp:{}", rsp);
} catch (Exception e) {
log.error("流式问答异常:{}", e.getMessage());
} finally {
// todo 临时打印日志
// log.info("=============== data: {}", data);
if (stopFlag) {
log.info("stopFlag is true");
subscription.cancel();
onComplete();
} else {
// emitter.(msg);
// 将数据发送给前端 如果是起始包 需要要处理一下
emitter.next(rsp);
// 继续请求接收下一个数据项
subscription.request(1);
}
}
/* new Thread(() -> { // 需要异步执行,否则会阻塞
String rsp = "";
try {
data = callBack.onNext(data, context);
if(ObjectUtils.isEmpty(data)){
this.stopFlag=true;
rsp = callBack.onNext(data, context);
if (ObjectUtils.isEmpty(rsp)) {
this.stopFlag = true;
}
log.info(" onNext data:{}", data);
log.info(" onNext rsp:{}", rsp);
} catch (Exception e) {
log.error("流式问答异常:{}", e.getMessage());
} finally {
......@@ -72,12 +100,17 @@ public class FlowChatSubscriber implements Subscriber<String>, Disposable {
subscription.cancel();
onComplete();
} else {
// emitter.(msg);
// 将数据发送给前端 如果是起始包 需要要处理一下
emitter.next(data);
emitter.next(rsp);
// 继续请求接收下一个数据项
subscription.request(1);
}
}
}).start();*/
}
@Override
......
......@@ -18,6 +18,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import java.util.HashMap;
......@@ -54,15 +55,14 @@ public class BaseChatService extends AbstractFlowChatTemplate {
if (StrUtil.equals("[DONE]", data)) {//[DONE]是消息结束标识
return null;
}
// 解析响应数据项
JSONObject bodyJson = JSONObject.parseObject(data);
if (bodyJson == null) {
return data;
}
// String id = bodyJson.getString("id");
if (num == 0) {
/*if(ObjectUtils.isEmpty(context.getAnswer())){
//创建一个初始化的answer
JSONObject rspJson = new JSONObject();
rspJson.put("id", context.getMsgId());
rspJson.put("created", new Date().getTime());
......@@ -83,17 +83,22 @@ public class BaseChatService extends AbstractFlowChatTemplate {
rspJson.put("choices", jsonArray);
rspJson.put("object", "");
num++;
context.getAnswer().append(".");
context.getEmitter().next(rspJson.toJSONString());
}
/* String status = bodyJson.getString("status");
if ("running".equals(status)) {
//计数器重置
//this.num=0;
//构建第一个数据响应包
*/
// context.getAnswer().append(answer);
// String id = bodyJson.getString("id");
/* if (num == 0) {
JSONObject rspJson = new JSONObject();
rspJson.put("id", context.getMsgId());
rspJson.put("created", new Date().getTime());
rspJson.put("model", "");
JSONArray jsonArray = new JSONArray();
JSONObject obj = new JSONObject();
......@@ -102,9 +107,15 @@ public class BaseChatService extends AbstractFlowChatTemplate {
delta.put("role", "assistant");
delta.put("content", "");
obj.put("delta", delta);
obj.put("index", 0);
obj.put("finish_reason", "null");
jsonArray.add(obj);
rspJson.put("delta", jsonArray);
return rspJson.toJSONString();
rspJson.put("choices", jsonArray);
rspJson.put("object", "");
num++;
context.getEmitter().next(rspJson.toJSONString());
}*/
JSONArray choices = bodyJson.getJSONArray("choices");
......
package com.mortals.xhx.webflux.template;
import com.mortals.framework.util.ThreadPool;
import com.mortals.xhx.thread.ChatMsgRspThread;
import com.mortals.xhx.webflux.common.CommonError;
import com.mortals.xhx.webflux.common.JsonUtils;
import com.mortals.xhx.webflux.factory.*;
......@@ -57,6 +59,11 @@ public abstract class AbstractFlowChatTemplate implements IFlowChat, FlowChatCal
log.info("subscriberMap in AbstractChatService after put: " + JsonUtils.toJson(subscriberMap));
response.subscribe(subscriber);
emitter.onDispose(subscriber);
ChatMsgRspThread chatMsgRspThread = new ChatMsgRspThread(context);
ThreadPool.getInstance().execute(chatMsgRspThread);
});
}
......@@ -104,7 +111,7 @@ public abstract class AbstractFlowChatTemplate implements IFlowChat, FlowChatCal
*/
private Flux<String> doRequest(FlowChatContext context, FLowChatRequest request, HttpHeaders headers) {
log.info("请求大模型开始,URL:{}, 参数:{}", request.getUrl(), request.getJsonBody());
return webClient.post()
Flux<String> flux = webClient.post()
.uri(request.getUrl())
.accept(MediaType.TEXT_EVENT_STREAM)
.headers(httpHeaders -> httpHeaders.addAll(headers))
......@@ -121,6 +128,11 @@ public abstract class AbstractFlowChatTemplate implements IFlowChat, FlowChatCal
log.error("系统异常", ex);
return Flux.just(JsonUtils.toJson(buildAnswer(CommonError.GLOBAL_ERROR.getMsg())));
});
//如果一定时间未收到响应,则模拟响应数据回写
return flux;
}
/**
......
......@@ -32,7 +32,7 @@ public class TestRot {
" \"messages\": [\n" +
" {\n" +
" \"role\": \"user\",\n" +
" \"content\": \"今天天气怎么样?\"\n" +
" \"content\": \"你有什么能力?\"\n" +
" }\n" +
" ],\n" +
" \"max_tokens\": 2048,\n" +
......@@ -51,7 +51,7 @@ public class TestRot {
Request request = new Request.Builder()
.url("http://localhost:18006/chat/base")
// .url("http://robot.scsmile.cn/chat/base")
//.url("http://robot.scsmile.cn/chat/base")
.headers(headers)
.post(requestBody)
.build();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment