Commit 4e441fc3 authored by alex yao's avatar alex yao

feat:提交任务

parent f9e3fdbc
...@@ -248,6 +248,7 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl ...@@ -248,6 +248,7 @@ public class BizAiDataAuditTaskServiceImpl extends BaseServiceImpl
private AirportProducerService airportProducerService; private AirportProducerService airportProducerService;
private void pushTask(Long taskId) { private void pushTask(Long taskId) {
// 配置Redis AI_DATA_AUDIT_TASK , 表示消息队列已经获取到任务 [用于定时任务(AiDataAuditScheduler)判断该任务下的 OCR任务是否已经全部完成] Push推送至队列 Get消费者获取到任务 Finish已结束
redisService.set("AI_DATA_AUDIT_TASK:" + taskId, "Push"); redisService.set("AI_DATA_AUDIT_TASK:" + taskId, "Push");
AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage(); AirportTaskCreateMessage airportTaskCreateMessage = new AirportTaskCreateMessage();
......
...@@ -10,7 +10,7 @@ import cn.com.yict.framemax.core.service.BaseService; ...@@ -10,7 +10,7 @@ import cn.com.yict.framemax.core.service.BaseService;
*/ */
public interface OCRConsumerService extends BaseService { public interface OCRConsumerService extends BaseService {
void OCR(OCRMessage message) throws Exception; void OCR(OCRMessage message) ;
void updateStatus(OCRStatusMessage message) throws Exception; void updateStatus(OCRStatusMessage message) throws Exception;
......
...@@ -84,7 +84,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -84,7 +84,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
@Override @Override
@Consumer(topic = AirportTopic.AIRPORT_CREATE_TOPIC) @Consumer(topic = AirportTopic.AIRPORT_CREATE_TOPIC)
public void createTask(AirportTaskCreateMessage message) throws Exception { public void createTask(AirportTaskCreateMessage message) throws Exception {
// 配置Redis AI_DATA_AUDIT_TASK , 表示消息队列已经获取到任务 [用于定时任务(AiDataAuditScheduler)判断该任务下的 OCR任务是否已经全部完成]
redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Get"); redisService.set("AI_DATA_AUDIT_TASK:" + message.getTaskId(), "Get");
BizAiDataAuditFileEntity bizAiDataAuditFileEntity = new BizAiDataAuditFileEntity(); BizAiDataAuditFileEntity bizAiDataAuditFileEntity = new BizAiDataAuditFileEntity();
...@@ -92,8 +92,12 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -92,8 +92,12 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
bizAiDataAuditFileEntity.setIsDeleted(CommonConstant.IsDeleted.N); bizAiDataAuditFileEntity.setIsDeleted(CommonConstant.IsDeleted.N);
List<BizAiDataAuditFileEntity> aiDataAuditFileEntities = bizAiDataAuditFileService.findByExample(bizAiDataAuditFileEntity, null); List<BizAiDataAuditFileEntity> aiDataAuditFileEntities = bizAiDataAuditFileService.findByExample(bizAiDataAuditFileEntity, null);
// 判断OCR是否完成
// 1. 判断OCR缓存表是否有数据
// 2. 若缓存表有数据,则判断是否完成
// 3. 若缓存表无数据,则发送OCR任务
// 4. 最后判断是否全部任务已完成
boolean isAllComplete = true; boolean isAllComplete = true;
for (BizAiDataAuditFileEntity aiDataAuditFileEntity : aiDataAuditFileEntities) { for (BizAiDataAuditFileEntity aiDataAuditFileEntity : aiDataAuditFileEntities) {
String md5 = aiDataAuditFileEntity.getMd5(); String md5 = aiDataAuditFileEntity.getMd5();
...@@ -124,7 +128,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -124,7 +128,7 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
} }
if (fileOcrCacheEntity.getParseStatus().equals("fail")) { if (fileOcrCacheEntity.getParseStatus().equals("fail")) {
//发送OCR 任务 //若是失败,重新发送OCR任务
OCRMessage ocrMessage = new OCRMessage(); OCRMessage ocrMessage = new OCRMessage();
ocrMessage.setId(fileOcrCacheEntity.getId()); ocrMessage.setId(fileOcrCacheEntity.getId());
ocrMessage.setMd5(md5); ocrMessage.setMd5(md5);
...@@ -259,37 +263,18 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -259,37 +263,18 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
executor.submit(() -> { executor.submit(() -> {
try { try {
Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode = fileList.stream()
.collect(Collectors.groupingBy(BizAiDataAuditFileEntity::getOrginalTemplateCode));
String ruleDesc = aiDataAuditRuleQueryItem.getRuleDesc();// 规则描述 String ruleDesc = aiDataAuditRuleQueryItem.getRuleDesc();// 规则描述
String firstFile = aiDataAuditRuleQueryItem.getFirstFile();// 文件1 String firstFileCode = aiDataAuditRuleQueryItem.getFirstFile();// 文件1
String firstField = aiDataAuditRuleQueryItem.getFirstField();// 字段1 String firstField = aiDataAuditRuleQueryItem.getFirstField();// 字段1
String secondFile = aiDataAuditRuleQueryItem.getSecondFile();// 文件2 String secondFileCode = aiDataAuditRuleQueryItem.getSecondFile();// 文件2
String secondField = aiDataAuditRuleQueryItem.getSecondField();// 字段2 String secondField = aiDataAuditRuleQueryItem.getSecondField();// 字段2
String firstFileContent = StringUtils.EMPTY;
String secondFileContent = StringUtils.EMPTY;
Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode = fileList.stream()
.collect(Collectors.groupingBy(BizAiDataAuditFileEntity::getOrginalTemplateCode));
// 找到文件 并下载对应的md 或 json 文件 // 找到文件 并下载对应的md 或 json 文件
BizAiDataAuditFileEntity firstAuditFileEntity = groupedByTemplateCode.get(firstFile).get(0); String firstFileContent = getFileContentByOCRCache(groupedByTemplateCode, firstFileCode);
BizFileOcrCacheEntity firstFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(firstAuditFileEntity.getMd5()); String secondFileContent = getFileContentByOCRCache(groupedByTemplateCode, secondFileCode);
String firstMdFileUrl = firstFileOcrCacheEntity.getMdFileUrl();
String firstJsonFileUrl = firstFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(firstJsonFileUrl)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(firstJsonFileUrl) ? firstMdFileUrl : firstJsonFileUrl);
firstFileContent = DocumentLoad.documentToText(file);
}
BizAiDataAuditFileEntity secondAuditFileEntity = groupedByTemplateCode.get(secondFile).get(0);
BizFileOcrCacheEntity secondFileOcrCacheEntity = bizFileOcrCacheService.findByMd5(secondAuditFileEntity.getMd5());
;
String secondMdFileUrl = secondFileOcrCacheEntity.getMdFileUrl();
String secondJsonFileUrl = secondFileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(secondJsonFileUrl)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(secondJsonFileUrl) ? secondMdFileUrl : secondJsonFileUrl);
secondFileContent = DocumentLoad.documentToText(file);
}
// 执行工作流 // 执行工作流
Map<String, Object> inputs = new LinkedHashMap<>(); Map<String, Object> inputs = new LinkedHashMap<>();
...@@ -338,6 +323,20 @@ public class AirportConsumerServiceImpl implements AirportConsumerService { ...@@ -338,6 +323,20 @@ public class AirportConsumerServiceImpl implements AirportConsumerService {
}); });
} }
private String getFileContentByOCRCache(Map<String, List<BizAiDataAuditFileEntity>> groupedByTemplateCode
, String fileCode) {
String fileContent = StringUtils.EMPTY;
BizAiDataAuditFileEntity auditFileEntity = groupedByTemplateCode.get(fileCode).get(0);
BizFileOcrCacheEntity fileOcrCacheEntity = bizFileOcrCacheService.findByMd5(auditFileEntity.getMd5());
String mdFileURL = fileOcrCacheEntity.getMdFileUrl();
String jsonFileURL = fileOcrCacheEntity.getJsonFileUrl();
if (StringUtils.isNotBlank(jsonFileURL)) {
File file = DocumentLoad.downloadURLDocument(StringUtils.isBlank(jsonFileURL) ? mdFileURL : jsonFileURL);
fileContent = DocumentLoad.documentToText(file);
}
return fileContent;
}
// 字段对文件 // 字段对文件
private void c2f(List<BizAiDataAuditFileEntity> fileList, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem, List<AuditResultDto> results, CountDownLatch countDownLatch) throws Exception { private void c2f(List<BizAiDataAuditFileEntity> fileList, AiDataAuditRuleQueryItem aiDataAuditRuleQueryItem, List<AuditResultDto> results, CountDownLatch countDownLatch) throws Exception {
executor.submit(() -> { executor.submit(() -> {
......
...@@ -5,6 +5,7 @@ import cn.com.poc.common.service.BizFileUploadRecordService; ...@@ -5,6 +5,7 @@ import cn.com.poc.common.service.BizFileUploadRecordService;
import cn.com.poc.common.service.BosConfigService; import cn.com.poc.common.service.BosConfigService;
import cn.com.poc.common.utils.DocumentLoad; import cn.com.poc.common.utils.DocumentLoad;
import cn.com.poc.common.utils.MD2Json; import cn.com.poc.common.utils.MD2Json;
import cn.com.poc.common.utils.StringUtils;
import cn.com.poc.common.utils.UUIDTool; import cn.com.poc.common.utils.UUIDTool;
import cn.com.poc.message.entity.OCRMessage; import cn.com.poc.message.entity.OCRMessage;
import cn.com.poc.message.entity.OCRStatusMessage; import cn.com.poc.message.entity.OCRStatusMessage;
...@@ -13,11 +14,15 @@ import cn.com.poc.message.service.OCRProducerService; ...@@ -13,11 +14,15 @@ import cn.com.poc.message.service.OCRProducerService;
import cn.com.poc.message.topic.OCRTopic; import cn.com.poc.message.topic.OCRTopic;
import cn.com.poc.ocr.entity.BizFileOcrCacheEntity; import cn.com.poc.ocr.entity.BizFileOcrCacheEntity;
import cn.com.poc.ocr.service.BizFileOcrCacheService; import cn.com.poc.ocr.service.BizFileOcrCacheService;
import cn.com.poc.thirdparty.resource.minerU.MdContent;
import cn.com.poc.thirdparty.resource.minerU.MinerUResponse; import cn.com.poc.thirdparty.resource.minerU.MinerUResponse;
import cn.com.poc.thirdparty.resource.minerU.MinerUResult;
import cn.com.poc.thirdparty.resource.minerU.api.MinerUAPI; import cn.com.poc.thirdparty.resource.minerU.api.MinerUAPI;
import cn.com.yict.framemax.core.exception.BusinessException;
import cn.com.yict.framemax.tumbleweed.client.annotation.Consumer; import cn.com.yict.framemax.tumbleweed.client.annotation.Consumer;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.crypto.digest.MD5; import cn.hutool.crypto.digest.MD5;
import org.beetl.ext.fn.StringUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -55,22 +60,25 @@ public class OCRConsumerServiceImpl implements OCRConsumerService { ...@@ -55,22 +60,25 @@ public class OCRConsumerServiceImpl implements OCRConsumerService {
@Override @Override
@Consumer(topic = OCRTopic.PARSE_PDF) @Consumer(topic = OCRTopic.PARSE_PDF)
public void OCR(OCRMessage message) throws Exception { public void OCR(OCRMessage message) {
OCRStatusMessage ocrStatusMessage = new OCRStatusMessage(); OCRStatusMessage ocrStatusMessage = new OCRStatusMessage();
ocrStatusMessage.setId(message.getId()); ocrStatusMessage.setId(message.getId());
ocrStatusMessage.setStatus("parsing"); ocrStatusMessage.setStatus("parsing");
ocrProducerService.updateStatus(ocrStatusMessage); ocrProducerService.updateStatus(ocrStatusMessage);
File file = DocumentLoad.downloadURLDocument(message.getFileURL());
MinerUResponse minerUResponse = new MinerUResponse();
minerUResponse.setFiles(file);
try { try {
//todo File file = DocumentLoad.downloadURLDocument(message.getFileURL());
Object o = minerUAPI.parsePDF(minerUResponse); MinerUResponse minerUResponse = new MinerUResponse();
String md = ""; minerUResponse.setFiles(file);
MinerUResult minerUResult = minerUAPI.parsePDF(minerUResponse);
if (minerUResult == null) {
throw new BusinessException("解析失败 minerU 返回为空");
}
String md = StringUtils.EMPTY;
for (String key : minerUResult.getResults().keySet()) {
MdContent mdContent = minerUResult.getResults().get(key);
md = mdContent.getMd_content();
}
BizFileOcrCacheEntity updateEntity = bizFileOcrCacheService.get(message.getId()); BizFileOcrCacheEntity updateEntity = bizFileOcrCacheService.get(message.getId());
updateEntity.setParseStatus("complete"); updateEntity.setParseStatus("complete");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment