Commit aa58bb63 authored by R10's avatar R10

Merge branch 'release_gdghg' of ssh://gitlab.gsstcloud.com:10022/poc/poc-api into release_gdghg

parents bbaf2a77 acc5f855
...@@ -248,6 +248,7 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl ...@@ -248,6 +248,7 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl
private AirportProducerService airportProducerService; private AirportProducerService airportProducerService;
private void pushTask(Long taskId) { private void pushTask(Long taskId) {
// 配置Redis AI_DATA_AUDIT_TASK , 表示消息队列已经获取到任务 [用于定时任务(AiDataAuditScheduler)判断该任务下的 OCR任务是否已经全部完成] Push推送至队列 Get消费者获取到任务 Finish已结束
redisService.set("AI_DATA_AUDIT_TASK:" + taskId, "Push"); redisService.set("AI_DATA_AUDIT_TASK:" + taskId, "Push");
AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage(); AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage();
......
...@@ -8,9 +8,9 @@ import cn.com.yict.framemax.core.service.BaseService; ...@@ -8,9 +8,9 @@ import cn.com.yict.framemax.core.service.BaseService;
* @author alex.yao * @author alex.yao
* @date 2025/10/30 * @date 2025/10/30
*/ */
public interface OCRConsumerService extends BaseService { public interface OcrConsumerService extends BaseService {
void OCR(OCRMessage message) throws Exception; void OCR(OCRMessage message) ;
void updateStatus(OCRStatusMessage message) throws Exception; void updateStatus(OCRStatusMessage message) throws Exception;
......
...@@ -8,7 +8,7 @@ import cn.com.yict.framemax.core.service.BaseService; ...@@ -8,7 +8,7 @@ import cn.com.yict.framemax.core.service.BaseService;
* @author alex.yao * @author alex.yao
* @date 2025/10/30 * @date 2025/10/30
*/ */
public interface OCRProducerService extends BaseService { public interface OcrProducerService extends BaseService {
OCRMessage OCR(OCRMessage message); OCRMessage OCR(OCRMessage message);
......
...@@ -25,7 +25,7 @@ import cn.com.poc.message.entity.AirportPDFPaperMessage; ...@@ -25,7 +25,7 @@ import cn.com.poc.message.entity.AirportPDFPaperMessage;
import cn.com.poc.message.entity.OCRMessage; import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.service.AirportConsumerService; import cn.com.poc.message.service.AirportConsumerService;
import cn.com.poc.message.service.AirportProducerService; import cn.com.poc.message.service.AirportProducerService;
import cn.com.poc.message.service.OCRProducerService; import cn.com.poc.message.service.OcrProducerService;
import cn.com.poc.message.topic.AirportTopic; import cn.com.poc.message.topic.AirportTopic;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity; import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.poc.ocr.service.BizFileOcrCacheService; import cn.com.poc.ocr.service.BizFileOcrCacheService;
...@@ -76,7 +76,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -76,7 +76,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
private BizAiDataAuditTaskService bizAiDataAuditTaskService; private BizAiDataAuditTaskService bizAiDataAuditTaskService;
@Resource @Resource
private OCRProducerService ocrProducerService; private OcrProducerService ocrProducerService;
@Resource @Resource
private RedisService redisService; private RedisService redisService;
...@@ -86,8 +86,8 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -86,8 +86,8 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
@Override @Override
@Consumer(topic = AirportTopic.AIRPORT_CREATE_TOPIC) @Consumer(topic = AirportTopic.AIRPORT_CREATE_TOPIC)
public void createTask(AirportTaskCreateMessage message) throws Exception { public void createTask(AirportTaskCreateMessage message) throws Exception {
// 配置Redis AI_DATA_AUDIT_TASK , 表示消息队列已经获取到任务 [用于定时任务(AiDataAuditScheduler)判断该任务下的 OCR任务是否已经全部完成]
redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Get"); redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Get");
BizAiDataAuditFileEntity bizAiDataAuditFileEntity = new BizAiDataAuditFileEntity(); BizAiDataAuditFileEntity bizAiDataAuditFileEntity = new BizAiDataAuditFileEntity();
...@@ -95,8 +95,12 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -95,8 +95,12 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
bizAiDataAuditFileEntity.setIsDeleted(CommonConstant.IsDeleted.N); bizAiDataAuditFileEntity.setIsDeleted(CommonConstant.IsDeleted.N);
List<BizAiDataAuditFileEntity> aiDataAuditFileEntities = bizAiDataAuditFileService.findByExample(bizAiDataAuditFileEntity, null); List<BizAiDataAuditFileEntity> aiDataAuditFileEntities = bizAiDataAuditFileService.findByExample(bizAiDataAuditFileEntity, null);
// 判断OCR是否完成
// 1. 判断OCR缓存表是否有数据
// 2. 若缓存表有数据,则判断是否完成
// 3. 若缓存表无数据,则发送OCR任务
// 4. 最后判断是否全部任务已完成
boolean isAllComplete = true; boolean isAllComplete = true;
for (BizAiDataAuditFileEntity aiDataAuditFileEntity : aiDataAuditFileEntities) { for (BizAiDataAuditFileEntity aiDataAuditFileEntity : aiDataAuditFileEntities) {
String md5 = aiDataAuditFileEntity.getMd5(); String md5 = aiDataAuditFileEntity.getMd5();
...@@ -127,7 +131,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -127,7 +131,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
} }
if (fileOcrCacheEntity.getParseStatus().equals("fail")) { if (fileOcrCacheEntity.getParseStatus().equals("fail")) {
//发送OCR 任务 //若是失败,重新发送OCR任务
OCRMessage ocrMessage = new OCRMessage(); OCRMessage ocrMessage = new OCRMessage();
ocrMessage.setId(fileOcrCacheEntity.getId()); ocrMessage.setId(fileOcrCacheEntity.getId());
ocrMessage.setMd5(md5); ocrMessage.setMd5(md5);
...@@ -262,37 +266,18 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -262,37 +266,18 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
executor.submit(() -> { executor.submit(() -> {
try { try {
Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode = fileList.stream()
.collect(Collectors.groupingBy(BizAiDataAuditFileEntity::getOrginalTemplateCode));
String ruleDesc = aiDataAuditRuleQueryItem.getRuleDesc();// 规则描述 String ruleDesc = aiDataAuditRuleQueryItem.getRuleDesc();// 规则描述
String firstFile = aiDataAuditRuleQueryItem.getFirstFile();// 文件1 String firstFileCode = aiDataAuditRuleQueryItem.getFirstFile();// 文件1
String firstField = aiDataAuditRuleQueryItem.getFirstField();// 字段1 String firstField = aiDataAuditRuleQueryItem.getFirstField();// 字段1
String secondFile = aiDataAuditRuleQueryItem.getSecondFile();// 文件2 String secondFileCode = aiDataAuditRuleQueryItem.getSecondFile();// 文件2
String secondField = aiDataAuditRuleQueryItem.getSecondField();// 字段2 String secondField = aiDataAuditRuleQueryItem.getSecondField();// 字段2
String firstFileContent = StringUtils.EMPTY;
String secondFileContent = StringUtils.EMPTY;
Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode = fileList.stream()
.collect(Collectors.groupingBy(BizAiDataAuditFileEntity::getOrginalTemplateCode));
// 找到文件 并下载对应的md 或 json 文件 // 找到文件 并下载对应的md 或 json 文件
BizAiDataAuditFileEntity firstAuditFileEntity = groupedByTemplateCode.get(firstFile).get(0); String firstFileContent = getFileContentByOCRCache(groupedByTemplateCode, firstFileCode);
BizFileOcrCacheEntity firstFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(firstAuditFileEntity.getMd5()); String secondFileContent = getFileContentByOCRCache(groupedByTemplateCode, secondFileCode);
String firstMdFileUrl = firstFileOcrCacheEntity.getMdFileUrl();
String firstJsonFileUrl = firstFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(firstJsonFileUrl)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(firstJsonFileUrl) ? firstMdFileUrl : firstJsonFileUrl);
firstFileContent = DocumentLoad.documentToText(file);
}
BizAiDataAuditFileEntity secondAuditFileEntity = groupedByTemplateCode.get(secondFile).get(0);
BizFileOcrCacheEntity secondFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(secondAuditFileEntity.getMd5());
;
String secondMdFileUrl = secondFileOcrCacheEntity.getMdFileUrl();
String secondJsonFileUrl = secondFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(secondJsonFileUrl)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(secondJsonFileUrl) ? secondMdFileUrl : secondJsonFileUrl);
secondFileContent = DocumentLoad.documentToText(file);
}
// 执行工作流 // 执行工作流
Map<String, Object> inputs = new LinkedHashMap<>(); Map<String, Object> inputs = new LinkedHashMap<>();
...@@ -341,6 +326,20 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -341,6 +326,20 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
}); });
} }
private String getFileContentByOCRCache(Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode
, String fileCode) {
String fileContent = StringUtils.EMPTY;
BizAiDataAuditFileEntity auditFileEntity = groupedByTemplateCode.get(fileCode).get(0);
BizFileOcrCacheEntity fileOcrCacheEntity = bizFileOcrCacheService.findByMd5(auditFileEntity.getMd5());
String mdFileURL = fileOcrCacheEntity.getMdFileUrl();
String jsonFileURL = fileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(jsonFileURL)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(jsonFileURL) ? mdFileURL : jsonFileURL);
fileContent = DocumentLoad.documentToText(file);
}
return fileContent;
}
// 字段对文件 // 字段对文件
private void c2f(List<BizAiDataAuditFileEntity> fileList, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem, List<AuditResultDto> results, CountDownLatch countDownLatch) throws Exception { private void c2f(List<BizAiDataAuditFileEntity> fileList, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem, List<AuditResultDto> results, CountDownLatch countDownLatch) throws Exception {
......
...@@ -5,16 +5,20 @@ import cn.com.poc.common.service.BizFileUploadRecordService; ...@@ -5,16 +5,20 @@ import cn.com.poc.common.service.BizFileUploadRecordService;
import cn.com.poc.common.service.BosConfigService; import cn.com.poc.common.service.BosConfigService;
import cn.com.poc.common.utils.DocumentLoad; import cn.com.poc.common.utils.DocumentLoad;
import cn.com.poc.common.utils.MD2Json; import cn.com.poc.common.utils.MD2Json;
import cn.com.poc.common.utils.StringUtils;
import cn.com.poc.common.utils.UUIDTool; import cn.com.poc.common.utils.UUIDTool;
import cn.com.poc.message.entity.OCRMessage; import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.entity.OCRStatusMessage; import cn.com.poc.message.entity.OCRStatusMessage;
import cn.com.poc.message.service.OCRConsumerService; import cn.com.poc.message.service.OcrConsumerService;
import cn.com.poc.message.service.OCRProducerService; import cn.com.poc.message.service.OcrProducerService;
import cn.com.poc.message.topic.OCRTopic; import cn.com.poc.message.topic.OCRTopic;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity; import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.poc.ocr.service.BizFileOcrCacheService; import cn.com.poc.ocr.service.BizFileOcrCacheService;
import cn.com.poc.thirdparty.resource.minerU.MdContent;
import cn.com.poc.thirdparty.resource.minerU.MinerUResponse; import cn.com.poc.thirdparty.resource.minerU.MinerUResponse;
import cn.com.poc.thirdparty.resource.minerU.MinerUResult;
import cn.com.poc.thirdparty.resource.minerU.api.MinerUAPI; import cn.com.poc.thirdparty.resource.minerU.api.MinerUAPI;
import cn.com.yict.framemax.core.exception.BusinessException;
import cn.com.yict.framemax.tumbleweed.client.annotation.Consumer; import cn.com.yict.framemax.tumbleweed.client.annotation.Consumer;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.crypto.digest.MD5; import cn.hutool.crypto.digest.MD5;
...@@ -33,9 +37,9 @@ import java.io.IOException; ...@@ -33,9 +37,9 @@ import java.io.IOException;
* @date 2025/10/30 * @date 2025/10/30
*/ */
@Service @Service
public class OCRConsumerServiceImpl implements OCRConsumerService { public class OcrConsumerServiceImpl implements OcrConsumerService {
private Logger logger = LoggerFactory.getLogger(OCRConsumerService.class); private Logger logger = LoggerFactory.getLogger(OcrConsumerService.class);
@Resource @Resource
private MinerUAPI minerUAPI; private MinerUAPI minerUAPI;
...@@ -50,27 +54,30 @@ public class OCRConsumerServiceImpl implements OCRConsumerService { ...@@ -50,27 +54,30 @@ public class OCRConsumerServiceImpl implements OCRConsumerService {
private BizFileUploadRecordService bizFileUploadRecordService; private BizFileUploadRecordService bizFileUploadRecordService;
@Resource @Resource
private OCRProducerService ocrProducerService; private OcrProducerService ocrProducerService;
@Override @Override
@Consumer(topic = OCRTopic.PARSE_PDF) @Consumer(topic = OCRTopic.PARSE_PDF)
public void OCR(OCRMessage message) throws Exception { public void OCR(OCRMessage message) {
OCRStatusMessage ocrStatusMessage = new OCRStatusMessage(); OCRStatusMessage ocrStatusMessage = new OCRStatusMessage();
ocrStatusMessage.setId(message.getId()); ocrStatusMessage.setId(message.getId());
ocrStatusMessage.setStatus("parsing"); ocrStatusMessage.setStatus("parsing");
ocrProducerService.updateStatus(ocrStatusMessage); ocrProducerService.updateStatus(ocrStatusMessage);
File file = DocumentLoad.downloadURLDocument(message.getFileURL());
MinerUResponse minerUResponse = new MinerUResponse();
minerUResponse.setFiles(file);
try { try {
//todo File file = DocumentLoad.downloadURLDocument(message.getFileURL());
Object o = minerUAPI.parsePDF(minerUResponse); MinerUResponse minerUResponse = new MinerUResponse();
String md = ""; minerUResponse.setFiles(file);
MinerUResult minerUResult = minerUAPI.parsePDF(minerUResponse);
if (minerUResult == null) {
throw new BusinessException("解析失败 minerU 返回为空");
}
String md = StringUtils.EMPTY;
for (String key : minerUResult.getResults().keySet()) {
MdContent mdContent = minerUResult.getResults().get(key);
md = mdContent.getMd_content();
}
BizFileOcrCacheEntity updateEntity = bizFileOcrCacheService.get(message.getId()); BizFileOcrCacheEntity updateEntity = bizFileOcrCacheService.get(message.getId());
updateEntity.setParseStatus("complete"); updateEntity.setParseStatus("complete");
......
...@@ -2,9 +2,8 @@ package cn.com.poc.message.service.impl; ...@@ -2,9 +2,8 @@ package cn.com.poc.message.service.impl;
import cn.com.poc.message.entity.OCRMessage; import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.entity.OCRStatusMessage; import cn.com.poc.message.entity.OCRStatusMessage;
import cn.com.poc.message.service.OCRProducerService; import cn.com.poc.message.service.OcrProducerService;
import cn.com.poc.message.topic.OCRTopic; import cn.com.poc.message.topic.OCRTopic;
import cn.com.yict.framemax.core.service.BaseService;
import cn.com.yict.framemax.tumbleweed.client.annotation.Producer; import cn.com.yict.framemax.tumbleweed.client.annotation.Producer;
import cn.com.yict.framemax.tumbleweed.constant.PublishPoint; import cn.com.yict.framemax.tumbleweed.constant.PublishPoint;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -14,7 +13,7 @@ import org.springframework.stereotype.Service; ...@@ -14,7 +13,7 @@ import org.springframework.stereotype.Service;
* @date 2025/10/30 * @date 2025/10/30
*/ */
@Service @Service
public class OCRProducerServiceImpl implements OCRProducerService { public class OcrProducerServiceImpl implements OcrProducerService {
@Override @Override
@Producer(topic = OCRTopic.PARSE_PDF) @Producer(topic = OCRTopic.PARSE_PDF)
......
...@@ -3,7 +3,8 @@ package cn.com.poc.ocr.scheduler; ...@@ -3,7 +3,8 @@ package cn.com.poc.ocr.scheduler;
import cn.com.poc.common.constant.CommonConstant; import cn.com.poc.common.constant.CommonConstant;
import cn.com.poc.common.utils.StringUtils; import cn.com.poc.common.utils.StringUtils;
import cn.com.poc.message.entity.OCRMessage; import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.service.OCRProducerService; import cn.com.poc.message.entity.OCRStatusMessage;
import cn.com.poc.message.service.OcrProducerService;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity; import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.poc.ocr.service.BizFileOcrCacheService; import cn.com.poc.ocr.service.BizFileOcrCacheService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
...@@ -26,7 +27,7 @@ public class OCRScheduler { ...@@ -26,7 +27,7 @@ public class OCRScheduler {
private BizFileOcrCacheService bizFileOcrCacheService; private BizFileOcrCacheService bizFileOcrCacheService;
@Resource @Resource
private OCRProducerService ocrProducerService; private OcrProducerService ocrProducerService;
@Scheduled(fixedDelay = 10000) @Scheduled(fixedDelay = 10000)
...@@ -41,8 +42,11 @@ public class OCRScheduler { ...@@ -41,8 +42,11 @@ public class OCRScheduler {
if (fileOcrCacheEntity.getParseStatus().equals("create")) { if (fileOcrCacheEntity.getParseStatus().equals("create")) {
if (StringUtils.isBlank(fileOcrCacheEntity.getMdFileUrl())) { if (StringUtils.isBlank(fileOcrCacheEntity.getMdFileUrl())) {
fileOcrCacheEntity.setParseStatus("queuing");
bizFileOcrCacheService.update(fileOcrCacheEntity); OCRStatusMessage ocrStatusMessage = new OCRStatusMessage();
ocrStatusMessage.setId(fileOcrCacheEntity.getId());
ocrStatusMessage.setStatus("queuing");
ocrProducerService.updateStatus(ocrStatusMessage);
OCRMessage message = new OCRMessage(); OCRMessage message = new OCRMessage();
message.setId(fileOcrCacheEntity.getId()); message.setId(fileOcrCacheEntity.getId());
...@@ -52,7 +56,6 @@ public class OCRScheduler { ...@@ -52,7 +56,6 @@ public class OCRScheduler {
} else { } else {
fileOcrCacheEntity.setParseStatus("complete"); fileOcrCacheEntity.setParseStatus("complete");
bizFileOcrCacheService.update(fileOcrCacheEntity); bizFileOcrCacheService.update(fileOcrCacheEntity);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment