Commit f9e3fdbc authored by alex yao's avatar alex yao

feat:提交任务

parent 16ec52bd
package cn.com.poc.ai_data_audit.scheduler;
import cn.com.poc.ai_data_audit.constants.DataAuditConstants;
import cn.com.poc.ai_data_audit.entity.BizAiDataAuditTaskEntity;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditTaskService;
import cn.com.poc.common.service.RedisService;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.service.AirportProducerService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author alex.yao
* @date 2025/10/30
*/
@Component
public class AiDataAuditScheduler {
@Resource
private RedisService redisService;
@Resource
private BizAiDataAuditTaskService bizAiDataAuditTaskService;
@Resource
private AirportProducerService airportProducerService;
@Scheduled(fixedDelay = 1000 * 30)
public void aiDataAudit() throws Exception {
BizAiDataAuditTaskEntity bizAiDataAuditTaskEntity = new BizAiDataAuditTaskEntity();
bizAiDataAuditTaskEntity.setStatus(DataAuditConstants.status.status_process);
bizAiDataAuditTaskEntity.setIsDeleted("N");
List<BizAiDataAuditTaskEntity> taskEntities = bizAiDataAuditTaskService.findByExample(bizAiDataAuditTaskEntity, null);
for (BizAiDataAuditTaskEntity taskEntity : taskEntities) {
Long taskId = taskEntity.getDataAuditTaskId();
if (redisService.hasKey("AI_DATA_AUDIT_TASK:" + taskId)) {
Object value = redisService.get("AI_DATA_AUDIT_TASK:" + taskId);
if ("Get".equals(value.toString())) {
AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage();
airportTaskCreateMessage.setTaskId(taskId);
airportProducerService.createTask(airportTaskCreateMessage);
}
}
}
}
}
......@@ -8,7 +8,10 @@ import cn.com.poc.ai_data_audit.entity.BizAiDataAuditFileEntity;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditFileService;
import cn.com.poc.common.model.BizFileUploadRecordModel;
import cn.com.poc.common.service.BizFileUploadRecordService;
import cn.com.poc.common.service.RedisService;
import cn.com.poc.common.utils.BlContext;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.service.AirportProducerService;
import cn.com.yict.framemax.core.service.impl.BaseServiceImpl;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditTaskService;
import cn.com.poc.ai_data_audit.model.BizAiDataAuditTaskModel;
......@@ -221,7 +224,7 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl
}
List<String> uploadFileUrlList = bizAiDataAuditFileEntityList.stream().map(item -> item.getUploadFileUrl()).collect(Collectors.toList());
if(CollectionUtils.isNotEmpty(uploadFileUrlList)){
if (CollectionUtils.isNotEmpty(uploadFileUrlList)) {
List<BizFileUploadRecordModel> byFileLink = bizFileUploadRecordService.findByFileLink(uploadFileUrlList);
Map<String, List<BizFileUploadRecordModel>> bizFileUploadRecordModelMap = byFileLink.stream().collect(Collectors.groupingBy(BizFileUploadRecordModel::getFileUrl));
......@@ -233,5 +236,22 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl
}
}
bizAiDataAuditFileService.batchSavedFile(bizAiDataAuditFileEntityList);
//任务推送
pushTask(savedModel.getDataAuditTaskId());
}
@Resource
private RedisService redisService;
@Resource
private AirportProducerService airportProducerService;
private void pushTask(Long taskId) {
redisService.set("AI_DATA_AUDIT_TASK:" + taskId, "Push");
AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage();
airportTaskCreateMessage.setTaskId(taskId);
airportProducerService.createTask(airportTaskCreateMessage);
}
}
\ No newline at end of file
package cn.com.poc.message.entity;
/**
* @author alex.yao
* @date 2025/10/30
*/
public class AirportTaskCreateMessage {
private Long taskId;
public Long getTaskId() {
return taskId;
}
public void setTaskId(Long taskId) {
this.taskId = taskId;
}
}
package cn.com.poc.message.service;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.entity.AirportPDFPaperMessage;
import cn.com.yict.framemax.core.service.BaseService;
......@@ -17,4 +18,11 @@ public interface AirportConsumerService extends BaseService {
*/
void pdfPaper(AirportPDFPaperMessage message) throws Exception;
/**
* 创建任务
*
* @param message
* @throws Exception
*/
void createTask(AirportTaskCreateMessage message) throws Exception;
}
package cn.com.poc.message.service;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.entity.AirportPDFPaperMessage;
import cn.com.yict.framemax.core.service.BaseService;
......@@ -17,4 +18,12 @@ public interface AirportProducerService extends BaseService {
*/
AirportPDFPaperMessage pdfPaper(AirportPDFPaperMessage message);
/**
* 创建任务
*
* @param message
* @return
*/
AirportTaskCreateMessage createTask(AirportTaskCreateMessage message);
}
......@@ -3,7 +3,6 @@ package cn.com.poc.message.service.impl;
import cn.com.gsst.dify_client.DifyChatClient;
import cn.com.gsst.dify_client.DifyClientFactory;
import cn.com.gsst.dify_client.enums.ResponseMode;
import cn.com.gsst.dify_client.exception.DifyApiException;
import cn.com.gsst.dify_client.model.chat.ChatMessage;
import cn.com.gsst.dify_client.model.chat.ChatMessageResponse;
import cn.com.poc.ai_data_audit.constants.DataAuditEnum;
......@@ -15,22 +14,28 @@ import cn.com.poc.ai_data_audit.query.AiDataAuditRuleQueryItem;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditFileService;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditRuleService;
import cn.com.poc.ai_data_audit.service.BizAiDataAuditTaskService;
import cn.com.poc.common.constant.CommonConstant;
import cn.com.poc.common.service.RedisService;
import cn.com.poc.common.utils.DocumentLoad;
import cn.com.poc.common.utils.StringUtils;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.entity.AirportPDFPaperMessage;
import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.service.AirportConsumerService;
import cn.com.poc.message.service.AirportProducerService;
import cn.com.poc.message.service.OCRProducerService;
import cn.com.poc.message.topic.AirportTopic;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.poc.ocr.service.BizFileOcrCacheService;
import cn.com.yict.framemax.core.exception.BusinessException;
import cn.com.yict.framemax.tumbleweed.client.annotation.Consumer;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
......@@ -51,9 +56,9 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 10L,
TimeUnit.SECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(5),
new java.util.concurrent.ArrayBlockingQueue<>(20),
new ThreadFactoryBuilder().setNameFormat("pdf-paper-%d").build(),
new ThreadPoolExecutor.DiscardOldestPolicy());
new ThreadPoolExecutor.DiscardPolicy());
@Resource
private BizFileOcrCacheService bizFileOcrCacheService;
......@@ -67,6 +72,79 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
@Resource
private BizAiDataAuditTaskService bizAiDataAuditTaskService;
@Resource
private OCRProducerService ocrProducerService;
@Resource
private RedisService redisService;
@Resource
private AirportProducerService airportProducerService;
@Override
@Consumer(topic = AirportTopic.AIRPORT_CREATE_TOPIC)
public void createTask(AirportTaskCreateMessage message) throws Exception {
redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Get");
BizAiDataAuditFileEntity bizAiDataAuditFileEntity = new BizAiDataAuditFileEntity();
bizAiDataAuditFileEntity.setTaskId(message.getTaskId());
bizAiDataAuditFileEntity.setIsDeleted(CommonConstant.IsDeleted.N);
List<BizAiDataAuditFileEntity> aiDataAuditFileEntities = bizAiDataAuditFileService.findByExample(bizAiDataAuditFileEntity, null);
boolean isAllComplete = true;
for (BizAiDataAuditFileEntity aiDataAuditFileEntity : aiDataAuditFileEntities) {
String md5 = aiDataAuditFileEntity.getMd5();
BizFileOcrCacheEntity bizFileOcrCacheEntity = new BizFileOcrCacheEntity();
bizFileOcrCacheEntity.setMd5(md5);
bizFileOcrCacheEntity.setIsDeleted(CommonConstant.IsDeleted.N);
List<BizFileOcrCacheEntity> bizFileOcrCacheEntities = bizFileOcrCacheService.findByExample(bizFileOcrCacheEntity, null);
if (CollectionUtils.isEmpty(bizFileOcrCacheEntities)) {
BizFileOcrCacheEntity saveFileOcrCacheEntity = new BizFileOcrCacheEntity();
saveFileOcrCacheEntity.setMd5(md5);
saveFileOcrCacheEntity.setParseStatus("create");
saveFileOcrCacheEntity.setFileUrl(aiDataAuditFileEntity.getUploadFileUrl());
BizFileOcrCacheEntity save = bizFileOcrCacheService.save(saveFileOcrCacheEntity);
//发送OCR 任务
OCRMessage ocrMessage = new OCRMessage();
ocrMessage.setId(save.getId());
ocrMessage.setMd5(md5);
ocrMessage.setFileURL(aiDataAuditFileEntity.getUploadFileUrl());
ocrProducerService.OCR(ocrMessage);
continue;
}
BizFileOcrCacheEntity fileOcrCacheEntity = bizFileOcrCacheEntities.get(0);
if (!fileOcrCacheEntity.getParseStatus().equals("complete")) {
isAllComplete = false;
}
if (fileOcrCacheEntity.getParseStatus().equals("fail")) {
//发送OCR 任务
OCRMessage ocrMessage = new OCRMessage();
ocrMessage.setId(fileOcrCacheEntity.getId());
ocrMessage.setMd5(md5);
ocrMessage.setFileURL(aiDataAuditFileEntity.getUploadFileUrl());
ocrProducerService.OCR(ocrMessage);
isAllComplete = false;
}
}
if (isAllComplete) { //OCR 已完成
redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Finish");
//执行规则判断任务
AirportPDFPaperMessage pdfPaperMessage = new AirportPDFPaperMessage();
pdfPaperMessage.setTaskId(message.getTaskId());
airportProducerService.pdfPaper(pdfPaperMessage);
}
}
@Override
@Consumer(topic = AirportTopic.AIRPORT_PDF_PAPER_TOPIC)
public void pdfPaper(AirportPDFPaperMessage message) throws Exception {
......@@ -96,7 +174,6 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
//获取所有的规则
AiDataAuditRuleQueryCondition aiDataAuditRuleQueryCondition = new AiDataAuditRuleQueryCondition();
// aiDataAuditRuleQueryCondition.setIds(ruleIdList.toArray(new Long[ruleIdList.size()]));
List<AiDataAuditRuleQueryItem> aiDataAuditRuleQueryItems = bizAiDataAuditRuleService.aiDataAuditRuleQueryItemList(aiDataAuditRuleQueryCondition, null);
......@@ -115,7 +192,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
countDownLatch.countDown();
}
}
countDownLatch.await();
countDownLatch.await(10 * 60, TimeUnit.SECONDS);
//更新结果和任务状态
bizAiDataAuditTaskEntity.setAuditResult(results.toString());
......@@ -128,6 +205,58 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
// 字段对字段
private void c2c(List<BizAiDataAuditFileEntity> fileList, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem, List<AuditResultDto> results, CountDownLatch countDownLatch) {
class C2CResult {
private boolean need_judge;
private boolean comply;
private String risk_analysis;
private String risk_description;
private String modification_suggestions;
public boolean getNeed_judge() {
return need_judge;
}
public void setNeed_judge(boolean need_judge) {
this.need_judge = need_judge;
}
public boolean getComply() {
return comply;
}
public void setComply(boolean comply) {
this.comply = comply;
}
public String getRisk_analysis() {
return risk_analysis;
}
public void setRisk_analysis(String risk_analysis) {
this.risk_analysis = risk_analysis;
}
public String getRisk_description() {
return risk_description;
}
public void setRisk_description(String risk_description) {
this.risk_description = risk_description;
}
public String getModification_suggestions() {
return modification_suggestions;
}
public void setModification_suggestions(String modification_suggestions) {
this.modification_suggestions = modification_suggestions;
}
}
executor.submit(() -> {
try {
String ruleDesc = aiDataAuditRuleQueryItem.getRuleDesc();// 规则描述
......@@ -144,10 +273,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
// 找到文件 并下载对应的md 或 json 文件
BizAiDataAuditFileEntity firstAuditFileEntity = groupedByTemplateCode.get(firstFile).get(0);
BizFileOcrCacheEntity bizFileOcrCacheEntity = new BizFileOcrCacheEntity();
bizFileOcrCacheEntity.setMd5(firstAuditFileEntity.getMd5());
List<BizFileOcrCacheEntity> firstocrCacheEntities = bizFileOcrCacheService.findByExample(bizFileOcrCacheEntity, null);
BizFileOcrCacheEntity firstFileOcrCacheEntity = firstocrCacheEntities.get(0);
BizFileOcrCacheEntity firstFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(firstAuditFileEntity.getMd5());
String firstMdFileUrl = firstFileOcrCacheEntity.getMdFileUrl();
String firstJsonFileUrl = firstFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(firstJsonFileUrl)) {
......@@ -156,16 +282,15 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
}
BizAiDataAuditFileEntity secondAuditFileEntity = groupedByTemplateCode.get(secondFile).get(0);
bizFileOcrCacheEntity = new BizFileOcrCacheEntity();
bizFileOcrCacheEntity.setMd5(secondAuditFileEntity.getMd5());
List<BizFileOcrCacheEntity> secondOcrCacheEntities = bizFileOcrCacheService.findByExample(bizFileOcrCacheEntity, null);
BizFileOcrCacheEntity secondFileOcrCacheEntity = secondOcrCacheEntities.get(0);
String secondMdFileUrl = firstFileOcrCacheEntity.getMdFileUrl();
String secondJsonFileUrl = firstFileOcrCacheEntity.getJsonFileUrl();
BizFileOcrCacheEntity secondFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(secondAuditFileEntity.getMd5());
;
String secondMdFileUrl = secondFileOcrCacheEntity.getMdFileUrl();
String secondJsonFileUrl = secondFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(secondJsonFileUrl)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(secondJsonFileUrl) ? secondMdFileUrl : secondJsonFileUrl);
secondFileContent = DocumentLoad.documentToText(file);
}
// 执行工作流
Map<String, Object> inputs = new LinkedHashMap<>();
inputs.put("rule", ruleDesc);
......@@ -174,7 +299,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
inputs.put("filed_1", firstField);
inputs.put("filed_2", secondField);
// 创建聊天客户端
DifyChatClient chatClient = DifyClientFactory.createChatClient(DIFY_BASE_URL, "app-1MRun3ecfLOzFS96xcWYCOOX");
DifyChatClient chatClient = DifyClientFactory.createChatClient(DIFY_BASE_URL, "app-AVURkM0NaoE5gBt2wEo9cN8E");
// 创建聊天消息
ChatMessage message = ChatMessage.builder()
.query("run")
......@@ -187,19 +312,26 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
// 发送消息并获取响应
ChatMessageResponse response = chatClient.sendChatMessage(message);
String result = response.getAnswer();
C2CResult c2cResult = JSONObject.parseObject(result, C2CResult.class);
if (c2cResult == null) {
throw new BusinessException("c2c result is null");
}
if (!c2cResult.getNeed_judge() || c2cResult.getComply()) {
return;
}
AuditResultDto auditResultDto = new AuditResultDto();
auditResultDto.setRiskLevel(aiDataAuditRuleQueryItem.getRiskLevel());
auditResultDto.setRiskTitle(aiDataAuditRuleQueryItem.getTitle());
//TODO:
auditResultDto.setRiskAnalysis(result);
auditResultDto.setRiskDescription(result);
auditResultDto.setModificationSuggestions(result);
auditResultDto.setRiskAnalysis(c2cResult.getRisk_analysis());//分析
auditResultDto.setRiskDescription(c2cResult.getRisk_description());//风险描述
auditResultDto.setModificationSuggestions(c2cResult.getModification_suggestions());//建议
results.add(auditResultDto);
} catch (Exception e) {
throw new RuntimeException(e);
throw new BusinessException(e);
} finally {
countDownLatch.countDown();
}
......@@ -263,7 +395,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
// 判断文件文件1里面的字端是否等于某个值
private AuditResultDto doC2f(String firstFileContent, String firstField, String ruleDesc, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem) throws DifyApiException, IOException {
private AuditResultDto doC2f(String firstFileContent, String firstField, String ruleDesc, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem) throws Exception {
// 执行工作流
Map<String, Object> inputs = new LinkedHashMap<>();
inputs.put("rule", ruleDesc);
......
package cn.com.poc.message.service.impl;
import cn.com.poc.message.entity.AirportTaskCreateMessage;
import cn.com.poc.message.entity.AirportPDFPaperMessage;
import cn.com.poc.message.service.AirportProducerService;
import cn.com.poc.message.topic.AirportTopic;
......@@ -18,4 +19,10 @@ public class AirportProducerServiceImpl implements AirportProducerService {
public AirportPDFPaperMessage pdfPaper(AirportPDFPaperMessage message) {
return message;
}
@Override
@Producer(topic = AirportTopic.AIRPORT_CREATE_TOPIC)
public AirportTaskCreateMessage createTask(AirportTaskCreateMessage message) {
return message;
}
}
......@@ -67,15 +67,18 @@ public class OCRConsumerServiceImpl implements OCRConsumerService {
MinerUResponse minerUResponse = new MinerUResponse();
minerUResponse.setFiles(file);
try {
//todo
Object o = minerUAPI.parsePDF(minerUResponse);
//
String md = "";
String json = MD2Json.md2json(md);
BizFileOcrCacheEntity updateEntity = bizFileOcrCacheService.get(message.getId());
updateEntity.setParseStatus("complete");
updateEntity.setJsonFileUrl(createFileAndUploadBos(md, "txt"));
String fileName = bizFileUploadRecordService.getFileNameByFileUrl(message.getFileURL());
if (fileName.equals("Form.pdf")) {
String json = MD2Json.md2json(md);
updateEntity.setJsonFileUrl(createFileAndUploadBos(json, "txt"));
}
updateEntity.setMdFileUrl(createFileAndUploadBos(md, "md"));
bizFileOcrCacheService.update(updateEntity);
......
......@@ -6,6 +6,8 @@ package cn.com.poc.message.topic;
*/
public interface AirportTopic {
String AIRPORT_CREATE_TOPIC = "AIRPORT_CREATE_TOPIC";
String AIRPORT_PDF_PAPER_TOPIC = "AIRPORT_PDF_PAPER_TOPIC";
......
......@@ -3,6 +3,7 @@ package cn.com.poc.ocr.service;
import cn.com.yict.framemax.core.service.BaseService;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.yict.framemax.data.model.PagingInfo;
import java.util.Collection;
import java.util.List;
......@@ -10,12 +11,14 @@ public interface BizFileOcrCacheService extends BaseService {
BizFileOcrCacheEntity get(java.lang.Long id) throws Exception;
List<BizFileOcrCacheEntity> findByExample(BizFileOcrCacheEntity example,PagingInfo pagingInfo) throws Exception;
List<BizFileOcrCacheEntity> findByExample(BizFileOcrCacheEntity example, PagingInfo pagingInfo) throws Exception;
BizFileOcrCacheEntity save(BizFileOcrCacheEntity entity) throws Exception;
BizFileOcrCacheEntity update(BizFileOcrCacheEntity entity) throws Exception;
void deletedById(java.lang.Long id) throws Exception;
BizFileOcrCacheEntity findByMd5(String md5) ;
}
\ No newline at end of file
package cn.com.poc.ocr.service.impl;
import cn.com.poc.common.constant.CommonConstant;
import cn.com.yict.framemax.core.service.impl.BaseServiceImpl;
import cn.com.poc.ocr.service.BizFileOcrCacheService;
import cn.com.poc.ocr.model.BizFileOcrCacheModel;
......@@ -114,4 +115,12 @@ public class BizFileOcrCacheServiceImpl extends BaseServiceImpl
}
}
@Override
public BizFileOcrCacheEntity findByMd5(String md5) {
BizFileOcrCacheModel bizFileOcrCacheModel = new BizFileOcrCacheModel();
bizFileOcrCacheModel.setMd5(md5);
bizFileOcrCacheModel.setIsDeleted(CommonConstant.IsDeleted.N);
List<BizFileOcrCacheModel> models = this.repository.findByExample(bizFileOcrCacheModel, null);
return CollectionUtils.isNotEmpty(models) ? BizFileOcrCacheConvert.modelToEntity(models.get(0)) : null;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment