Commit 4c53c2a3 authored by alex yao's avatar alex yao

feat: SSE客户端断开感知

parent 95bba9cd
...@@ -17,6 +17,7 @@ import cn.com.poc.common.constant.CommonConstant; ...@@ -17,6 +17,7 @@ import cn.com.poc.common.constant.CommonConstant;
import cn.com.poc.common.utils.Assert; import cn.com.poc.common.utils.Assert;
import cn.com.poc.common.utils.BlContext; import cn.com.poc.common.utils.BlContext;
import cn.com.poc.common.utils.JsonUtils; import cn.com.poc.common.utils.JsonUtils;
import cn.com.poc.common.utils.SSEUtil;
import cn.com.poc.knowledge.constant.KnowledgeConstant; import cn.com.poc.knowledge.constant.KnowledgeConstant;
import cn.com.poc.knowledge.entity.BizKnowledgeDocumentEntity; import cn.com.poc.knowledge.entity.BizKnowledgeDocumentEntity;
import cn.com.poc.knowledge.query.KnowledgeDocumentRelationQueryItem; import cn.com.poc.knowledge.query.KnowledgeDocumentRelationQueryItem;
...@@ -565,25 +566,28 @@ public class AgentApplicationInfoServiceImpl implements AgentApplicationInfoServ ...@@ -565,25 +566,28 @@ public class AgentApplicationInfoServiceImpl implements AgentApplicationInfoServ
*/ */
private AgentResultEntity llmExecutorAndOutput(Float topP, boolean stream, String model, Message[] messageArray, FunctionResult functionResult, List<KnowledgeContentResult> knowledgeResult, HttpServletResponse httpServletResponse) throws Exception { private AgentResultEntity llmExecutorAndOutput(Float topP, boolean stream, String model, Message[] messageArray, FunctionResult functionResult, List<KnowledgeContentResult> knowledgeResult, HttpServletResponse httpServletResponse) throws Exception {
if (stream) { if (stream) {
httpServletResponse.setContentType(TEXT_EVENT_STREAM_CHARSET_UTF_8); SSEUtil sseUtil = new SSEUtil(httpServletResponse);
PrintWriter writer = httpServletResponse.getWriter(); // httpServletResponse.setContentType(TEXT_EVENT_STREAM_CHARSET_UTF_8);
// PrintWriter writer = httpServletResponse.getWriter();
if (CollectionUtils.isNotEmpty(knowledgeResult)) { if (CollectionUtils.isNotEmpty(knowledgeResult)) {
LargeModelDemandResult result = new LargeModelDemandResult(); LargeModelDemandResult result = new LargeModelDemandResult();
result.setCode("0"); result.setCode("0");
result.setKnowledgeContentResult(knowledgeResult); result.setKnowledgeContentResult(knowledgeResult);
writer.write(EVENT_STREAM_PREFIX + JsonUtils.serialize(result) + "\n\n"); sseUtil.send(JsonUtils.serialize(result));
writer.flush(); // writer.write(EVENT_STREAM_PREFIX + JsonUtils.serialize(result) + "\n\n");
// writer.flush();
} }
if (ObjectUtil.isNotNull(functionResult) && StringUtils.isNotBlank(functionResult.getFunctionName())) { if (ObjectUtil.isNotNull(functionResult) && StringUtils.isNotBlank(functionResult.getFunctionName())) {
LargeModelDemandResult result = new LargeModelDemandResult(); LargeModelDemandResult result = new LargeModelDemandResult();
result.setCode("0"); result.setCode("0");
ToolFunction toolFunction = functionResultConvertToolFunction(functionResult); ToolFunction toolFunction = functionResultConvertToolFunction(functionResult);
result.setFunction(toolFunction); result.setFunction(toolFunction);
writer.write(EVENT_STREAM_PREFIX + JsonUtils.serialize(result) + "\n\n"); sseUtil.send(JsonUtils.serialize(result));
writer.flush(); // writer.write(EVENT_STREAM_PREFIX + JsonUtils.serialize(result) + "\n\n");
// writer.flush();
} }
BufferedReader bufferedReader = invokeLLMStream(model, messageArray, topP); BufferedReader bufferedReader = invokeLLMStream(model, messageArray, topP);
return textOutputStream(httpServletResponse, bufferedReader); return textOutputStream(sseUtil, bufferedReader);
} else { } else {
LargeModelDemandResult largeModelDemandResult = invokeLLM(model, messageArray, topP); LargeModelDemandResult largeModelDemandResult = invokeLLM(model, messageArray, topP);
if (ObjectUtil.isNotNull(functionResult) && StringUtils.isNotBlank(functionResult.getFunctionName())) { if (ObjectUtil.isNotNull(functionResult) && StringUtils.isNotBlank(functionResult.getFunctionName())) {
...@@ -793,6 +797,54 @@ public class AgentApplicationInfoServiceImpl implements AgentApplicationInfoServ ...@@ -793,6 +797,54 @@ public class AgentApplicationInfoServiceImpl implements AgentApplicationInfoServ
return agentResultEntity; return agentResultEntity;
} }
/**
* 文本输出结果
*
* @param sseUtil
* @param bufferedReader
* @throws IOException
*/
private AgentResultEntity textOutputStream(SSEUtil sseUtil, BufferedReader bufferedReader) {
String res = "";
StringBuilder output = new StringBuilder();
StringBuilder reasoningContent = new StringBuilder();
try {
while ((res = bufferedReader.readLine()) != null) {
if (StringUtils.isEmpty(res)) {
continue;
}
res = StringUtils.replace(res, EVENT_STREAM_PREFIX, StringUtils.EMPTY);
LargeModelDemandResult result = JsonUtils.deSerialize(res, LargeModelDemandResult.class);
if (!"0".equals(result.getCode())) {
logger.error("LLM Error,code:{}", result.getCode());
I18nMessageException ex = new I18nMessageException("exception/call.failure");
result.setMessage(ex.getMessage());
sseUtil.send(JsonUtils.serialize(result));
sseUtil.send("[DONE]");
sseUtil.complete();
throw ex;
}
if (StringUtils.isNotEmpty(result.getMessage())) {
output.append(result.getMessage());
}
if (StringUtils.isNotEmpty(result.getReasoningContent())) {
reasoningContent.append(result.getReasoningContent());
}
sseUtil.send(res);
}
bufferedReader.close();
// 关闭资源
sseUtil.send("[DONE]");
sseUtil.complete();
} catch (IOException e) {
logger.error("连接断开,code:{}", e.getMessage());
} finally {
AgentResultEntity agentResultEntity = new AgentResultEntity();
agentResultEntity.setMessage(output.toString());
agentResultEntity.setReasoningContent(reasoningContent.toString());
return agentResultEntity;
}
}
/** /**
* 文本输出结果 * 文本输出结果
......
package cn.com.poc.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* @author alex.yao
* @date 2025/3/4
*/
public class SSEUtil {
private final Logger logger = LoggerFactory.getLogger(SSEUtil.class);
private final ServletOutputStream outputStream;
public SSEUtil(HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream;charset=UTF-8");
outputStream = response.getOutputStream();
}
public void send(String data) throws IOException {
String message = "data: " + data + "\n\n";
outputStream.write(message.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
public boolean complete() {
try {
outputStream.close();
} catch (IOException e) {
logger.error("无法关闭sse连接:{}", e.getMessage());
return false;
}
return true;
}
public boolean completeByError(String errorMsg) {
try {
String mess = "data: " + errorMsg + "\n\n";
outputStream.write(mess.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
} catch (IOException e) {
logger.error("无法关闭sse连接:{}", e.getMessage());
return false;
}
return true;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment