From 2ef7f9deeecdc3569c07da2c219f6fd216e3c83b Mon Sep 17 00:00:00 2001 From: AdrianWang Date: Thu, 5 Jun 2025 19:14:08 +0800 Subject: [PATCH] =?UTF-8?q?feat(mcp):=20=E6=B7=BB=E5=8A=A0mineru=E7=9A=84m?= =?UTF-8?q?cp-server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- projects/mcp/.env.example | 5 + projects/mcp/.gitignore | 12 + projects/mcp/DOCKER_README.md | 164 +++++ projects/mcp/Dockerfile | 35 + projects/mcp/README.md | 348 +++++++++ projects/mcp/docker-compose.yml | 14 + projects/mcp/pyproject.toml | 39 + projects/mcp/src/mineru/api.py | 729 ++++++++++++++++++ projects/mcp/src/mineru/cli.py | 73 ++ projects/mcp/src/mineru/config.py | 91 +++ projects/mcp/src/mineru/examples.py | 76 ++ projects/mcp/src/mineru/language.py | 106 +++ projects/mcp/src/mineru/server.py | 1060 +++++++++++++++++++++++++++ 13 files changed, 2752 insertions(+) create mode 100644 projects/mcp/.env.example create mode 100644 projects/mcp/.gitignore create mode 100644 projects/mcp/DOCKER_README.md create mode 100644 projects/mcp/Dockerfile create mode 100644 projects/mcp/README.md create mode 100644 projects/mcp/docker-compose.yml create mode 100644 projects/mcp/pyproject.toml create mode 100644 projects/mcp/src/mineru/api.py create mode 100644 projects/mcp/src/mineru/cli.py create mode 100644 projects/mcp/src/mineru/config.py create mode 100644 projects/mcp/src/mineru/examples.py create mode 100644 projects/mcp/src/mineru/language.py create mode 100644 projects/mcp/src/mineru/server.py diff --git a/projects/mcp/.env.example b/projects/mcp/.env.example new file mode 100644 index 00000000..e282fb92 --- /dev/null +++ b/projects/mcp/.env.example @@ -0,0 +1,5 @@ +MINERU_API_BASE = "https://mineru.net" +MINERU_API_KEY = "eyJ0eXB..." +OUTPUT_DIR=./downloads +USE_LOCAL_API=false +LOCAL_MINERU_API_BASE="http://localhost:8888" \ No newline at end of file diff --git a/projects/mcp/.gitignore b/projects/mcp/.gitignore new file mode 100644 index 00000000..74107893 --- /dev/null +++ b/projects/mcp/.gitignore @@ -0,0 +1,12 @@ +downloads +.env +uv.lock +.venv +src/mineru/__pycache__ +dist +.DS_Store +.cursor +build +*.lock +src/mineru_mcp.egg-info +test \ No newline at end of file diff --git a/projects/mcp/DOCKER_README.md b/projects/mcp/DOCKER_README.md new file mode 100644 index 00000000..36d16753 --- /dev/null +++ b/projects/mcp/DOCKER_README.md @@ -0,0 +1,164 @@ +# MinerU MCP-Server Docker 部署指南 + +## 1. 简介 + +本文档提供了使用 Docker 部署 MinerU MCP-Server 的详细指南。通过 Docker 部署,你可以在任何支持 Docker 的环境中快速启动 MinerU MCP 服务器,无需考虑复杂的环境配置和依赖管理。 + +Docker 部署的主要优势: + +- **一致的运行环境**:确保在任何平台上都有相同的运行环境 +- **简化部署流程**:一键启动,无需手动安装依赖 +- **易于扩展和迁移**:便于在不同环境间迁移和扩展服务 +- **资源隔离**:避免与宿主机其他服务产生冲突 + +## 2. 先决条件 + +在开始之前,请确保你的系统已安装以下软件: + +- [Docker](https://www.docker.com/get-started) (19.03 或更高版本) +- [Docker Compose](https://docs.docker.com/compose/install/) (1.27.0 或更高版本) + +你可以通过以下命令检查它们是否已正确安装: + +```bash +docker --version +docker-compose --version +``` + +同时,你需要: + +- 从 [MinerU 官网](https://mineru.net) 获取的 API 密钥(如果需要使用远程 API) +- 充足的硬盘空间,用于存储转换后的文件 + +## 3. 使用 Docker Compose 部署(推荐) + +Docker Compose 提供了最简单的部署方式,特别适合快速开始使用或开发环境。 + +### 3.1 准备配置文件 + +1. 克隆仓库(如果尚未克隆): + + ```bash + git clone + cd mineru-mcp + ``` + +2. 创建环境变量文件: + + ```bash + cp .env.example .env + ``` + +3. 编辑 `.env` 文件,设置必要的环境变量: + + ``` + MINERU_API_BASE=https://mineru.net + MINERU_API_KEY=你的API密钥 + OUTPUT_DIR=./downloads + USE_LOCAL_API=false + LOCAL_MINERU_API_BASE=http://localhost:8080 + ``` + + 如果你计划使用本地 API,请将 `USE_LOCAL_API` 设置为 `true`,并确保 `LOCAL_MINERU_API_BASE` 指向你的本地 API 服务地址。 + +### 3.2 启动服务 + +在项目根目录下运行: + +```bash +docker-compose up -d +``` + +这将会: +- 构建 Docker 镜像(如果尚未构建) +- 创建并启动容器 +- 在后台运行服务 (`-d` 参数) + +服务将在 `http://localhost:8001` 上启动。你可以通过 MCP 客户端连接此地址。 + +### 3.3 查看日志 + +要查看服务日志,运行: + +```bash +docker-compose logs -f +``` + +按 `Ctrl+C` 退出日志查看。 + +### 3.4 停止服务 + +要停止服务,运行: + +```bash +docker-compose down +``` + +如果你想同时删除构建的镜像,可以使用: + +```bash +docker-compose down --rmi local +``` + +## 4. 手动构建和运行 Docker 镜像 + +如果你需要更多的控制或自定义,你可以手动构建和运行 Docker 镜像。 + +### 4.1 构建镜像 + +在项目根目录下运行: + +```bash +docker build -t mineru-mcp:latest . +``` + +这将根据 Dockerfile 构建一个名为 `mineru-mcp` 的 Docker 镜像,标签为 `latest`。 + +### 4.2 运行容器 + +使用环境变量文件运行容器: + +```bash +docker run -p 8001:8001 --env-file .env mineru-mcp:latest +``` + +或者直接指定环境变量: + +```bash +docker run -p 8001:8001 \ + -e MINERU_API_BASE=https://mineru.net \ + -e MINERU_API_KEY=你的API密钥 \ + -e OUTPUT_DIR=/app/downloads \ + -v $(pwd)/downloads:/app/downloads \ + mineru-mcp:latest +``` + +### 4.3 挂载卷 + +为了持久化存储转换后的文件,你应该挂载宿主机目录到容器的输出目录: + +```bash +docker run -p 8001:8001 --env-file .env \ + -v $(pwd)/downloads:/app/downloads \ + mineru-mcp:latest +``` + +这将挂载当前工作目录下的 `downloads` 文件夹到容器内的 `/app/downloads` 目录。 + +## 5. 环境变量配置 + +Docker 环境中支持的环境变量与标准环境相同: + +| 环境变量 | 说明 | 默认值 | +| ------------------------- | -------------------------------------------------------------- | ------------------------- | +| `MINERU_API_BASE` | MinerU 远程 API 的基础 URL | `https://mineru.net` | +| `MINERU_API_KEY` | MinerU API 密钥,需要从官网申请 | - | +| `OUTPUT_DIR` | 转换后文件的保存路径 | `/app/downloads` | +| `USE_LOCAL_API` | 是否使用本地 API 进行解析(仅适用于 `local_parse_pdf` 工具) | `false` | +| `LOCAL_MINERU_API_BASE` | 本地 API 的基础 URL(当 `USE_LOCAL_API=true` 时有效) | `http://localhost:8080` | + +在 Docker 环境中,你可以: + +- 通过 `--env-file` 指定环境变量文件 +- 通过 `-e` 参数直接指定环境变量 +- 在 `docker-compose.yml` 文件中的 `environment` 部分配置环境变量 diff --git a/projects/mcp/Dockerfile b/projects/mcp/Dockerfile new file mode 100644 index 00000000..c33ea05b --- /dev/null +++ b/projects/mcp/Dockerfile @@ -0,0 +1,35 @@ +FROM python:3.12-slim + +# Set working directory +WORKDIR /app + +# Configure pip to use Alibaba Cloud mirror +RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ + +# Install dependencies +RUN pip install --no-cache-dir poetry + +# Copy project files +COPY pyproject.toml . +COPY README.md . +COPY src/ ./src/ + +# Install the package +RUN poetry config virtualenvs.create false && \ + poetry install + +# Create downloads directory +RUN mkdir -p /app/downloads + +# Set environment variables +ENV OUTPUT_DIR=/app/downloads +# MINERU_API_KEY should be provided at runtime +ENV MINERU_API_BASE=https://mineru.net +ENV USE_LOCAL_API=false +ENV LOCAL_MINERU_API_BASE="" + +# Expose the port that SSE will run on +EXPOSE 8001 + +# Set command to start the service with SSE transport +CMD ["mineru-mcp", "--transport", "sse", "--output-dir", "/app/downloads"] \ No newline at end of file diff --git a/projects/mcp/README.md b/projects/mcp/README.md new file mode 100644 index 00000000..a8937a5b --- /dev/null +++ b/projects/mcp/README.md @@ -0,0 +1,348 @@ +# MinerU MCP-Server + +## 1. 概述 + +这个项目提供了一个 **MinerU MCP 服务器** (`mineru-mcp`),它基于 **FastMCP** 框架构建。其主要功能是作为 **MinerU API** 的接口,用于将文档转换为 Markdown格式。 + +该服务器通过 MCP 协议公开了以下主要工具: + +1. `parse_documents`:统一接口,支持处理本地文件和URL,自动根据配置选择最合适的处理方式,并自动读取转换后的内容 +2. `get_ocr_languages`:获取OCR支持的语言列表 + +这使得其他应用程序或 MCP 客户端能够轻松地集成 MinerU 的 文档 到 Markdown 转换功能。 + +## 2. 核心功能 + +* **文档提取**: 接收文档文件输入(单个或多个 URL、单个或多个本地路径,支持doc、ppt、pdf、图片多种格式),调用 MinerU API 进行内容提取和格式转换,最终生成 Markdown 文件。 +* **批量处理**: 支持同时处理多个文档文件(通过提供由空格、逗号或换行符分隔的 URL 列表或本地文件路径列表)。 +* **OCR 支持**: 可选启用 OCR 功能(默认不开启),以处理扫描版或图片型文档。 +* **多语言支持**: 支持多种语言的识别,可以自动检测文档语言或手动指定。 +* **自动化流程**: 自动处理与 MinerU API 的交互,包括任务提交、状态轮询、结果下载解压、结果文件读取。 +* **本地解析**: 支持调用本地部署的mineru模型直接解析文档,不依赖远程 API,适用于隐私敏感场景或离线环境。 +* **智能路径处理**: 自动识别URL和本地文件路径,根据USE_LOCAL_API配置选择最合适的处理方式。 + +## 3. 安装 + +在开始安装之前,请确保您的系统满足以下基本要求: +* Python >= 3.10 + +### 3.1 使用 pip 安装 (推荐) + +如果你的包已发布到 PyPI 或其他 Python 包索引,可以直接使用 pip 安装: + +```bash +pip install mineru-mcp +``` + +这种方式适用于不需要修改源代码的普通用户。 + +### 3.2 从源码安装 + +如果你需要修改源代码或进行开发,可以从源码安装。 + +克隆仓库并进入项目目录: + +```bash +git clone # 替换为你的仓库 URL +cd mineru-mcp +``` + +推荐使用 `uv` 或 `pip` 配合虚拟环境进行安装: + +**使用 uv (推荐):** + +```bash +# 安装 uv (如果尚未安装) +# pip install uv + +# 创建并激活虚拟环境 +uv venv + +# Linux/macOS +source .venv/bin/activate +# Windows +# .venv\\Scripts\\activate + +# 安装依赖和项目 +uv pip install -e . +``` + +**使用 pip:** + +```bash +# 创建并激活虚拟环境 +python -m venv .venv + +# Linux/macOS +source .venv/bin/activate +# Windows +# .venv\\Scripts\\activate + +# 安装依赖和项目 +pip install -e . +``` + +## 4. 环境变量配置 + +本项目支持通过环境变量进行配置。你可以选择直接设置系统环境变量,或者在项目根目录创建 `.env` 文件(参考 `.env.example` 模板)。 + +### 4.1 支持的环境变量 + +| 环境变量 | 说明 | 默认值 | +| ------------------------- | --------------------------------------------------------------- | ------------------------- | +| `MINERU_API_BASE` | MinerU 远程 API 的基础 URL | `https://mineru.net` | +| `MINERU_API_KEY` | MinerU API 密钥,需要从[官网](https://mineru.net)申请 | - | +| `OUTPUT_DIR` | 转换后文件的保存路径 | `./downloads` | +| `USE_LOCAL_API` | 是否使用本地 API 进行解析 | `false` | +| `LOCAL_MINERU_API_BASE` | 本地 API 的基础 URL(当 `USE_LOCAL_API=true` 时有效) | `http://localhost:8080` | + +### 4.2 远程 API 与本地 API + +本项目支持两种 API 模式: + +* **远程 API**:默认模式,通过 MinerU 官方提供的云服务进行文档解析。优点是无需本地部署复杂的模型和环境,但需要网络连接和 API 密钥。 +* **本地 API**:在本地部署 MinerU 引擎进行文档解析,适用于对数据隐私有高要求或需要离线使用的场景。设置 `USE_LOCAL_API=true` 时生效。 + +### 4.3 获取 API 密钥 + +要获取 `MINERU_API_KEY`,请访问 [MinerU 官网](https://mineru.net) 注册账号并申请 API 密钥。 + +## 5. 使用方法 + +### 5.1 工具概览 + +本项目通过 MCP 协议提供以下工具: + +1. **parse_documents**:统一接口,支持处理本地文件和URL,根据 `USE_LOCAL_API` 配置自动选择合适的处理方式,并自动读取转换后的文件内容 +2. **get_ocr_languages**:获取 OCR 支持的语言列表 + +### 5.2 参数说明 + +#### 5.2.1 parse_documents + +| 参数 | 类型 | 说明 | 默认值 | 适用模式 | +| ------------------- | ------- | ------------------------------------------------------------------- | -------- | -------- | +| `file_sources` | 字符串 | 文件路径或URL,多个可用逗号或换行符分隔 (支持pdf、ppt、pptx、doc、docx以及图片格式jpg、jpeg、png) | - | 全部 | +| `enable_ocr` | 布尔值 | 是否启用 OCR 功能 | `false` | 全部 | +| `language` | 字符串 | 文档语言,默认"ch"中文,可选"en"英文等 | `ch` | 全部 | +| `page_ranges` | 字符串 (可选) | 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6":表示选取第2页、第4页至第6页;"2--2":表示从第2页一直选取到倒数第二页。(远程API) | `None` | 远程API | + +> **注意**: +> - 当 `USE_LOCAL_API=true` 时,如果提供了URL,这些URL会被过滤掉,只处理本地文件路径 +> - 当 `USE_LOCAL_API=false` 时,会同时处理URL和本地文件路径 + +#### 5.2.2 get_ocr_languages + +无需参数 + +## 6. MCP 客户端集成 + +你可以在任何支持 MCP 协议的客户端中使用 MinerU MCP 服务器。 + +### 6.1 在 Claude 中使用 + +将 MinerU MCP 服务器配置为 Claude 的工具,即可在 Claude 中直接使用文档转 Markdown 功能。配置工具时详情请参考 MCP 工具配置文档。根据不同的安装和使用场景,你可以选择以下两种配置方式: + +#### 6.1.1 源码运行方式 + +如果你是从源码安装并运行 MinerU MCP,可以使用以下配置。这种方式适合你需要修改源码或者进行开发调试的场景: + +```json +{ + "mcpServers": { + "mineru-mcp": { + "command": "uv", + "args": ["--directory", "/Users/adrianwang/Documents/minerU-mcp", "run", "-m", "mineru.cli"], + "env": { + "MINERU_API_BASE": "https://mineru.net", + "MINERU_API_KEY": "ey...", + "OUTPUT_DIR": "./downloads", + "USE_LOCAL_API": "true", + "LOCAL_MINERU_API_BASE": "http://localhost:8080" + } + } + } +} +``` + +这种配置的特点: + +- 使用 `uv` 命令 +- 通过 `--directory` 参数指定源码所在目录 +- 使用 `-m mineru.cli` 运行模块 +- 适合开发调试和定制化需求 + +#### 6.1.2 安装包运行方式 + +如果你是通过 pip 或 uv 安装了 mineru-mcp 包,可以使用以下更简洁的配置。这种方式适合生产环境或日常使用: + +```json +{ + "mcpServers": { + "mineru-mcp": { + "command": "uvx", + "args": ["mineru-mcp"], + "env": { + "MINERU_API_BASE": "https://mineru.net", + "MINERU_API_KEY": "ey...", + "OUTPUT_DIR": "./downloads", + "USE_LOCAL_API": "true", + "LOCAL_MINERU_API_BASE": "http://localhost:8080" + } + } + } +} +``` + +这种配置的特点: + +- 使用 `uvx` 命令直接运行已安装的包 +- 配置更加简洁 +- 不需要指定源码目录 +- 适合稳定的生产环境使用 + +### 6.2 在 FastMCP 客户端中使用 + + +```python +from fastmcp import FastMCP + +# 初始化 FastMCP 客户端 +client = FastMCP(server_url="http://localhost:8001") + +# 使用 parse_documents 工具处理单个文档 +result = await client.tool_call( + tool_name="parse_documents", + params={"file_sources": "/path/to/document.pdf"} +) + +# 混合处理URLs和本地文件 +result = await client.tool_call( + tool_name="parse_documents", + params={"file_sources": "/path/to/file.pdf, https://example.com/document.pdf"} +) + +# 启用OCR +result = await client.tool_call( + tool_name="parse_documents", + params={"file_sources": "/path/to/file.pdf", "enable_ocr": True} +) +``` + +### 6.3 直接运行服务 + +你可以通过设置环境变量并直接运行命令的方式启动 MinerU MCP 服务器,这种方式特别适合快速测试和开发环境。 + +#### 6.3.1 设置环境变量 + +首先,确保设置了必要的环境变量。你可以通过创建 `.env` 文件(参考 `.env.example`)或直接在命令行中设置: + +```bash +# Linux/macOS +export MINERU_API_BASE="https://mineru.net" +export MINERU_API_KEY="your-api-key" +export OUTPUT_DIR="./downloads" +export USE_LOCAL_API="true" # 可选,如果需要本地解析 +export LOCAL_MINERU_API_BASE="http://localhost:8080" # 可选,如果启用本地 API + +# Windows +set MINERU_API_BASE=https://mineru.net +set MINERU_API_KEY=your-api-key +set OUTPUT_DIR=./downloads +set USE_LOCAL_API=true +set LOCAL_MINERU_API_BASE=http://localhost:8080 +``` + +#### 6.3.2 启动服务 + +使用以下命令启动 MinerU MCP 服务器,支持多种传输模式: + +**SSE 传输模式**: +```bash +uv run mineru-mcp --transport sse +``` + +**Streamable HTTP 传输模式**: +```bash +uv run mineru-mcp --transport streamable-http +``` + +或者,如果你使用全局安装: + +```bash +mineru-mcp --transport sse +# 或 +mineru-mcp --transport streamable-http +``` + +服务默认在 `http://localhost:8001` 启动,使用的传输协议取决于你指定的 `--transport` 参数。 + +> **注意**:不同传输模式使用不同的路由路径: +> - SSE 模式:`/sse`(例如:`http://localhost:8001/sse`) +> - Streamable HTTP 模式:`/mcp`(例如:`http://localhost:8001/mcp`) + + +## 7. Docker 部署 + +本项目支持使用 Docker 进行部署,使你能在任何支持 Docker 的环境中快速启动 MinerU MCP 服务器。 + +### 7.1 使用 Docker Compose + +1. 确保你已经安装了 Docker 和 Docker Compose +2. 复制项目根目录中的 `.env.example` 文件为 `.env`,并根据你的需求修改环境变量 +3. 运行以下命令启动服务: + +```bash +docker-compose up -d +``` + +服务默认会在 `http://localhost:8001` 启动。 + +### 7.2 手动构建 Docker 镜像 + +如果需要手动构建 Docker 镜像,可以使用以下命令: + +```bash +docker build -t mineru-mcp:latest . +``` + +然后启动容器: + +```bash +docker run -p 8001:8001 --env-file .env mineru-mcp:latest +``` + +更多 Docker 相关信息,请参考 `DOCKER_README.md` 文件。 + +## 8. 常见问题 + +### 8.1 API 密钥问题 + +**问题**:无法连接 MinerU API 或返回 401 错误。 +**解决方案**:检查你的 API 密钥是否正确设置。在 `.env` 文件中确保 `MINERU_API_KEY` 环境变量包含有效的密钥。 + +### 8.2 如何优雅退出服务 + +**问题**:如何正确地停止 MinerU MCP 服务? +**解决方案**:服务运行时,可以通过按 `Ctrl+C` 来优雅地退出。系统会自动处理正在进行的操作,并确保所有资源得到正确释放。如果一次 `Ctrl+C` 没有响应,可以再次按下 `Ctrl+C` 强制退出。 + +### 8.3 文件路径问题 + +**问题**:使用 `parse_documents` 工具处理本地文件时报找不到文件错误。 +**解决方案**:请确保使用绝对路径,或者相对于服务器运行目录的正确相对路径。 + +### 8.4 MCP 服务调用超时问题 + +**问题**:调用 `parse_documents` 工具时出现 `Error calling tool 'parse_documents': MCP error -32001: Request timed out` 错误。 +**解决方案**:这个问题常见于处理大型文档或网络不稳定的情况。在某些 MCP 客户端(如 Cursor)中,超时后可能导致无法再次调用 MCP 服务,需要重启客户端。最新版本的 Cursor 中可能会显示正在调用 MCP,但实际上没有真正调用成功。建议: +1. **等待官方修复**:这是Cursor客户端的已知问题,建议等待Cursor官方修复 +2. **处理小文件**:尽量只处理少量小文件,避免处理大型文档导致超时 +3. **分批处理**:将多个文件分成多次请求处理,每次只处理一两个文件 +4. 增加超时时间设置(如果客户端支持) +5. 对于超时后无法再次调用的问题,需要重启 MCP 客户端 +6. 如果反复出现超时,请检查网络连接或考虑使用本地 API 模式 + + +## 9. 许可证 + +本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。 diff --git a/projects/mcp/docker-compose.yml b/projects/mcp/docker-compose.yml new file mode 100644 index 00000000..1bd9e533 --- /dev/null +++ b/projects/mcp/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3' + +services: + mineru-mcp: + build: + context: . + dockerfile: Dockerfile + ports: + - "8001:8001" + environment: + - MINERU_API_KEY=${MINERU_API_KEY} + volumes: + - ./downloads:/app/downloads + restart: unless-stopped \ No newline at end of file diff --git a/projects/mcp/pyproject.toml b/projects/mcp/pyproject.toml new file mode 100644 index 00000000..3eebef9f --- /dev/null +++ b/projects/mcp/pyproject.toml @@ -0,0 +1,39 @@ +[project] +name = "mineru-mcp" +version = "0.1.12" +description = "MinerU MCP Server for PDF to Markdown conversion" +authors = [ + {name = "minerU",email = "OpenDataLab@pjlab.org.cn"} +] +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.10,<4.0" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "fastmcp>=2.5.2", + "python-dotenv>=1.0.0", + "requests>=2.31.0", + "aiohttp>=3.9.0", + "httpx>=0.24.0", + "uvicorn>=0.20.0", + "starlette>=0.27.0", +] + +[project.scripts] +mineru-mcp = "mineru.cli:main" + +[tool.poetry] +packages = [{include = "mineru", from = "src"}] + +[[tool.poetry.source]] +name = "aliyun" +url = "https://mirrors.aliyun.com/pypi/simple/" +priority = "primary" + +[build-system] +requires = ["setuptools>=42.0", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/projects/mcp/src/mineru/api.py b/projects/mcp/src/mineru/api.py new file mode 100644 index 00000000..43de2da4 --- /dev/null +++ b/projects/mcp/src/mineru/api.py @@ -0,0 +1,729 @@ +"""MinerU File转Markdown转换的API客户端。""" + +import asyncio +import os +import zipfile +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import aiohttp +import requests + +from . import config + + +def singleton_func(cls): + instance = {} + + def _singleton(*args, **kwargs): + if cls not in instance: + instance[cls] = cls(*args, **kwargs) + return instance[cls] + + return _singleton + + +@singleton_func +class MinerUClient: + """ + 用于与 MinerU API 交互以将 File 转换为 Markdown 的客户端。 + """ + + def __init__(self, api_base: Optional[str] = None, api_key: Optional[str] = None): + """ + 初始化 MinerU API 客户端。 + + Args: + api_base: MinerU API 的基础 URL (默认: 从环境变量获取) + api_key: 用于向 MinerU 进行身份验证的 API 密钥 (默认: 从环境变量获取) + """ + self.api_base = api_base or config.MINERU_API_BASE + self.api_key = api_key or config.MINERU_API_KEY + + if not self.api_key: + # 提供更友好的错误消息 + raise ValueError( + "错误: MinerU API 密钥 (MINERU_API_KEY) 未设置或为空。\n" + "请确保已设置 MINERU_API_KEY 环境变量,例如:\n" + " export MINERU_API_KEY='your_actual_api_key'\n" + "或者,在项目根目录的 `.env` 文件中定义该变量。" + ) + + async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: + """ + 向 MinerU API 发出请求。 + + Args: + method: HTTP 方法 (GET, POST 等) + endpoint: API 端点路径 (不含基础 URL) + **kwargs: 传递给 aiohttp 请求的其他参数 + + Returns: + dict: API 响应 (JSON 格式) + """ + url = f"{self.api_base}{endpoint}" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Accept": "application/json", + } + + if "headers" in kwargs: + kwargs["headers"].update(headers) + else: + kwargs["headers"] = headers + + # 创建一个不包含授权信息的参数副本,用于日志记录 + log_kwargs = kwargs.copy() + if "headers" in log_kwargs and "Authorization" in log_kwargs["headers"]: + log_kwargs["headers"] = log_kwargs["headers"].copy() + log_kwargs["headers"]["Authorization"] = "Bearer ****" # 隐藏API密钥 + + config.logger.debug(f"API请求: {method} {url}") + config.logger.debug(f"请求参数: {log_kwargs}") + + async with aiohttp.ClientSession() as session: + async with session.request(method, url, **kwargs) as response: + response.raise_for_status() + response_json = await response.json() + + config.logger.debug(f"API响应: {response_json}") + + return response_json + + async def submit_file_url_task( + self, + urls: Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]], + enable_ocr: bool = True, + language: str = "ch", + page_ranges: Optional[str] = None, + ) -> Dict[str, Any]: + """ + 提交 File URL 以转换为 Markdown。支持单个URL或多个URL批量处理。 + + Args: + urls: 可以是以下形式之一: + 1. 单个URL字符串 + 2. 多个URL的列表 + 3. 包含URL配置的字典列表,每个字典包含: + - url: File文件URL (必需) + - is_ocr: 是否启用OCR (可选) + - data_id: 文件数据ID (可选) + - page_ranges: 页码范围 (可选) + enable_ocr: 是否为转换启用 OCR(所有文件的默认值) + language: 指定文档语言,默认 ch,中文 + page_ranges: 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6"表示选取第2页、第4页至第6页;"2--2"表示从第2页到倒数第2页。 + + Returns: + dict: 任务信息,包括batch_id + """ + # 统计URL数量 + url_count = 1 + if isinstance(urls, list): + url_count = len(urls) + config.logger.debug( + f"调用submit_file_url_task: {url_count}个URL, " + + f"ocr={enable_ocr}, " + + f"language={language}" + ) + + # 处理输入,确保我们有一个URL配置列表 + urls_config = [] + + # 转换输入为标准格式 + if isinstance(urls, str): + urls_config.append( + {"url": urls, "is_ocr": enable_ocr, "page_ranges": page_ranges} + ) + + elif isinstance(urls, list): + # 处理URL列表或URL配置列表 + for i, url_item in enumerate(urls): + if isinstance(url_item, str): + # 简单的URL字符串 + urls_config.append( + { + "url": url_item, + "is_ocr": enable_ocr, + "page_ranges": page_ranges, + } + ) + + elif isinstance(url_item, dict): + # 含有详细配置的URL字典 + if "url" not in url_item: + raise ValueError(f"URL配置必须包含 'url' 字段: {url_item}") + + url_is_ocr = url_item.get("is_ocr", enable_ocr) + url_page_ranges = url_item.get("page_ranges", page_ranges) + + url_config = {"url": url_item["url"], "is_ocr": url_is_ocr} + if url_page_ranges is not None: + url_config["page_ranges"] = url_page_ranges + + urls_config.append(url_config) + else: + raise TypeError(f"不支持的URL配置类型: {type(url_item)}") + elif isinstance(urls, dict): + # 单个URL配置字典 + if "url" not in urls: + raise ValueError(f"URL配置必须包含 'url' 字段: {urls}") + + url_is_ocr = urls.get("is_ocr", enable_ocr) + url_page_ranges = urls.get("page_ranges", page_ranges) + + url_config = {"url": urls["url"], "is_ocr": url_is_ocr} + if url_page_ranges is not None: + url_config["page_ranges"] = url_page_ranges + + urls_config.append(url_config) + else: + raise TypeError(f"urls 必须是字符串、列表或字典,而不是 {type(urls)}") + + # 构建API请求payload + files_payload = urls_config # 与submit_file_task不同,这里直接使用URLs配置 + + payload = { + "language": language, + "files": files_payload, + } + + # 调用批量API + response = await self._request( + "POST", "/api/v4/extract/task/batch", json=payload + ) + + # 检查响应 + if "data" not in response or "batch_id" not in response["data"]: + raise ValueError(f"提交批量URL任务失败: {response}") + + batch_id = response["data"]["batch_id"] + + config.logger.info(f"开始处理 {len(urls_config)} 个文件URL") + config.logger.debug(f"批量URL任务提交成功,批次ID: {batch_id}") + + # 返回包含batch_id的响应和URLs信息 + result = { + "data": { + "batch_id": batch_id, + "uploaded_files": [url_config.get("url") for url_config in urls_config], + } + } + + # 对于单个URL的情况,设置file_name以保持与原来返回格式的兼容性 + if len(urls_config) == 1: + url = urls_config[0]["url"] + # 从URL中提取文件名 + file_name = url.split("/")[-1] + result["data"]["file_name"] = file_name + + return result + + async def submit_file_task( + self, + files: Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]], + enable_ocr: bool = True, + language: str = "ch", + page_ranges: Optional[str] = None, + ) -> Dict[str, Any]: + """ + 提交本地 File 文件以转换为 Markdown。支持单个文件路径或多个文件配置。 + + Args: + files: 可以是以下形式之一: + 1. 单个文件路径字符串 + 2. 多个文件路径的列表 + 3. 包含文件配置的字典列表,每个字典包含: + - path/name: 文件路径或文件名 + - is_ocr: 是否启用OCR (可选) + - data_id: 文件数据ID (可选) + - page_ranges: 页码范围 (可选) + enable_ocr: 是否为转换启用 OCR(所有文件的默认值) + language: 指定文档语言,默认 ch,中文 + page_ranges: 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6"表示选取第2页、第4页至第6页;"2--2"表示从第2页到倒数第2页。 + + Returns: + dict: 任务信息,包括batch_id + """ + # 统计文件数量 + file_count = 1 + if isinstance(files, list): + file_count = len(files) + config.logger.debug( + f"调用submit_file_task: {file_count}个文件, " + + f"ocr={enable_ocr}, " + + f"language={language}" + ) + + # 处理输入,确保我们有一个文件配置列表 + files_config = [] + + # 转换输入为标准格式 + if isinstance(files, str): + # 单个文件路径 + file_path = Path(files) + if not file_path.exists(): + raise FileNotFoundError(f"未找到 File 文件: {file_path}") + + files_config.append( + { + "path": file_path, + "name": file_path.name, + "is_ocr": enable_ocr, + "page_ranges": page_ranges, + } + ) + + elif isinstance(files, list): + # 处理文件路径列表或文件配置列表 + for i, file_item in enumerate(files): + if isinstance(file_item, str): + # 简单的文件路径 + file_path = Path(file_item) + if not file_path.exists(): + raise FileNotFoundError(f"未找到 File 文件: {file_path}") + + files_config.append( + { + "path": file_path, + "name": file_path.name, + "is_ocr": enable_ocr, + "page_ranges": page_ranges, + } + ) + + elif isinstance(file_item, dict): + # 含有详细配置的文件字典 + if "path" not in file_item and "name" not in file_item: + raise ValueError( + f"文件配置必须包含 'path' 或 'name' 字段: {file_item}" + ) + + if "path" in file_item: + file_path = Path(file_item["path"]) + if not file_path.exists(): + raise FileNotFoundError(f"未找到 File 文件: {file_path}") + + file_name = file_path.name + else: + file_name = file_item["name"] + file_path = None + + file_is_ocr = file_item.get("is_ocr", enable_ocr) + file_page_ranges = file_item.get("page_ranges", page_ranges) + + file_config = { + "path": file_path, + "name": file_name, + "is_ocr": file_is_ocr, + } + if file_page_ranges is not None: + file_config["page_ranges"] = file_page_ranges + + files_config.append(file_config) + else: + raise TypeError(f"不支持的文件配置类型: {type(file_item)}") + elif isinstance(files, dict): + # 单个文件配置字典 + if "path" not in files and "name" not in files: + raise ValueError(f"文件配置必须包含 'path' 或 'name' 字段: {files}") + + if "path" in files: + file_path = Path(files["path"]) + if not file_path.exists(): + raise FileNotFoundError(f"未找到 File 文件: {file_path}") + + file_name = file_path.name + else: + file_name = files["name"] + file_path = None + + file_is_ocr = files.get("is_ocr", enable_ocr) + file_page_ranges = files.get("page_ranges", page_ranges) + + file_config = { + "path": file_path, + "name": file_name, + "is_ocr": file_is_ocr, + } + if file_page_ranges is not None: + file_config["page_ranges"] = file_page_ranges + + files_config.append(file_config) + else: + raise TypeError(f"files 必须是字符串、列表或字典,而不是 {type(files)}") + + # 步骤1: 构建API请求payload + files_payload = [] + for file_config in files_config: + file_payload = { + "name": file_config["name"], + "is_ocr": file_config["is_ocr"], + } + if "page_ranges" in file_config and file_config["page_ranges"] is not None: + file_payload["page_ranges"] = file_config["page_ranges"] + files_payload.append(file_payload) + + payload = { + "language": language, + "files": files_payload, + } + + # 步骤2: 获取文件上传URL + response = await self._request("POST", "/api/v4/file-urls/batch", json=payload) + + # 检查响应 + if ( + "data" not in response + or "batch_id" not in response["data"] + or "file_urls" not in response["data"] + ): + raise ValueError(f"获取上传URL失败: {response}") + + batch_id = response["data"]["batch_id"] + file_urls = response["data"]["file_urls"] + + if len(file_urls) != len(files_config): + raise ValueError( + f"上传URL数量 ({len(file_urls)}) 与文件数量 ({len(files_config)}) 不匹配" + ) + + config.logger.info(f"开始上传 {len(file_urls)} 个本地文件") + config.logger.debug(f"获取上传URL成功,批次ID: {batch_id}") + + # 步骤3: 上传所有文件 + uploaded_files = [] + + for i, (file_config, upload_url) in enumerate(zip(files_config, file_urls)): + file_path = file_config["path"] + if file_path is None: + raise ValueError(f"文件 {file_config['name']} 没有有效的路径") + + try: + with open(file_path, "rb") as f: + # 重要:不设置Content-Type,让OSS自动处理 + response = requests.put(upload_url, data=f) + + if response.status_code != 200: + raise ValueError( + f"文件上传失败,状态码: {response.status_code}, 响应: {response.text}" + ) + + config.logger.debug(f"文件 {file_path.name} 上传成功") + uploaded_files.append(file_path.name) + except Exception as e: + raise ValueError(f"文件 {file_path.name} 上传失败: {str(e)}") + + config.logger.info(f"文件上传完成,共 {len(uploaded_files)} 个文件") + + # 返回包含batch_id的响应和已上传的文件信息 + result = {"data": {"batch_id": batch_id, "uploaded_files": uploaded_files}} + + # 对于单个文件的情况,保持与原来返回格式的兼容性 + if len(uploaded_files) == 1: + result["data"]["file_name"] = uploaded_files[0] + + return result + + async def get_batch_task_status(self, batch_id: str) -> Dict[str, Any]: + """ + 获取批量转换任务的状态。 + + Args: + batch_id: 批量任务的ID + + Returns: + dict: 批量任务状态信息 + """ + response = await self._request( + "GET", f"/api/v4/extract-results/batch/{batch_id}" + ) + + return response + + async def process_file_to_markdown( + self, + task_fn, + task_arg: Union[str, List[Dict[str, Any]], Dict[str, Any]], + enable_ocr: bool = True, + output_dir: Optional[str] = None, + max_retries: int = 180, + retry_interval: int = 10, + ) -> Union[str, Dict[str, Any]]: + """ + 从开始到结束处理 File 到 Markdown 的转换。 + + Args: + task_fn: 提交任务的函数 (submit_file_url_task 或 submit_file_task) + task_arg: 任务函数的参数,可以是: + - URL字符串 + - 文件路径字符串 + - 包含文件配置的字典 + - 包含多个文件配置的字典列表 + enable_ocr: 是否启用 OCR + output_dir: 结果的输出目录 + max_retries: 最大状态检查重试次数 + retry_interval: 状态检查之间的时间间隔 (秒) + + Returns: + Union[str, Dict[str, Any]]: + - 单文件: 包含提取的 Markdown 文件的目录路径 + - 多文件: { + "results": [ + { + "filename": str, + "status": str, + "content": str, + "error_message": str, + } + ], + "extract_dir": str + } + """ + try: + # 提交任务 - 使用位置参数调用,而不是命名参数 + task_info = await task_fn(task_arg, enable_ocr) + + # 批量任务处理 + batch_id = task_info["data"]["batch_id"] + + # 获取所有上传文件的名称 + uploaded_files = task_info["data"].get("uploaded_files", []) + if not uploaded_files and "file_name" in task_info["data"]: + uploaded_files = [task_info["data"]["file_name"]] + + if not uploaded_files: + raise ValueError("无法获取上传文件的信息") + + config.logger.debug(f"批量任务提交成功。Batch ID: {batch_id}") + + # 跟踪所有文件的处理状态 + files_status = {} # 将使用file_name作为键 + files_download_urls = {} + failed_files = {} # 记录失败的文件和错误信息 + + # 准备输出路径 + output_path = config.ensure_output_dir(output_dir) + + # 轮询任务完成情况 + for i in range(max_retries): + status_info = await self.get_batch_task_status(batch_id) + + config.logger.debug(f"轮训结果:{status_info}") + + if ( + "data" not in status_info + or "extract_result" not in status_info["data"] + ): + config.logger.error(f"获取批量任务状态失败: {status_info}") + await asyncio.sleep(retry_interval) + continue + + # 检查所有文件的状态 + all_done = True + has_progress = False + + for result in status_info["data"]["extract_result"]: + file_name = result.get("file_name") + + if not file_name: + continue + + # 初始化状态,如果之前没有记录 + if file_name not in files_status: + files_status[file_name] = "pending" + + state = result.get("state") + files_status[file_name] = state + + if state == "done": + # 保存下载链接 + full_zip_url = result.get("full_zip_url") + if full_zip_url: + files_download_urls[file_name] = full_zip_url + config.logger.info(f"文件 {file_name} 处理完成") + else: + config.logger.debug( + f"文件 {file_name} 标记为完成但没有下载链接" + ) + all_done = False + elif state in ["failed", "error"]: + err_msg = result.get("err_msg", "未知错误") + failed_files[file_name] = err_msg + config.logger.warning(f"文件 {file_name} 处理失败: {err_msg}") + # 不抛出异常,继续处理其他文件 + else: + all_done = False + # 显示进度信息 + if state == "running" and "extract_progress" in result: + has_progress = True + progress = result["extract_progress"] + extracted = progress.get("extracted_pages", 0) + total = progress.get("total_pages", 0) + if total > 0: + percent = (extracted / total) * 100 + config.logger.info( + f"处理进度: {file_name} " + + f"{extracted}/{total} 页 " + + f"({percent:.1f}%)" + ) + + # 检查是否所有文件都已经处理完成 + expected_file_count = len(uploaded_files) + processed_file_count = len(files_status) + completed_file_count = len(files_download_urls) + len(failed_files) + + # 记录当前状态 + config.logger.debug( + f"文件处理状态: all_done={all_done}, " + + f"files_status数量={processed_file_count}, " + + f"上传文件数量={expected_file_count}, " + + f"下载链接数量={len(files_download_urls)}, " + + f"失败文件数量={len(failed_files)}" + ) + + # 判断是否所有文件都已完成(包括成功和失败的) + if ( + processed_file_count > 0 + and processed_file_count >= expected_file_count + and completed_file_count >= processed_file_count + ): + if files_download_urls or failed_files: + config.logger.info("文件处理完成") + if failed_files: + config.logger.warning( + f"有 {len(failed_files)} 个文件处理失败" + ) + break + else: + # 这种情况不应该发生,但保险起见 + all_done = False + + # 如果没有进度信息,只显示简单的等待消息 + if not has_progress: + config.logger.info(f"等待文件处理完成... ({i+1}/{max_retries})") + + await asyncio.sleep(retry_interval) + else: + # 如果超过最大重试次数,检查是否有部分文件完成 + if not files_download_urls and not failed_files: + raise TimeoutError(f"批量任务 {batch_id} 未在允许的时间内完成") + else: + config.logger.warning( + "警告: 部分文件未在允许的时间内完成," + "继续处理已完成的文件" + ) + + # 创建主提取目录 + extract_dir = output_path / batch_id + extract_dir.mkdir(exist_ok=True) + + # 准备结果列表 + results = [] + + # 下载并解压每个成功的文件的结果 + for file_name, download_url in files_download_urls.items(): + try: + config.logger.debug + (f"下载文件处理结果: {file_name}") + + # 从下载URL中提取zip文件名作为子目录名 + zip_file_name = download_url.split("/")[-1] + # 去掉.zip扩展名 + zip_dir_name = os.path.splitext(zip_file_name)[0] + + file_extract_dir = extract_dir / zip_dir_name + file_extract_dir.mkdir(exist_ok=True) + + # 下载ZIP文件 + zip_path = output_path / f"{batch_id}_{zip_file_name}" + + async with aiohttp.ClientSession() as session: + async with session.get( + download_url, + headers={"Authorization": f"Bearer {self.api_key}"}, + ) as response: + response.raise_for_status() + with open(zip_path, "wb") as f: + f.write(await response.read()) + + # 解压到子文件夹 + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(file_extract_dir) + + # 解压后删除ZIP文件 + zip_path.unlink() + + # 尝试读取Markdown内容 + markdown_content = "" + markdown_files = list(file_extract_dir.glob("*.md")) + if markdown_files: + with open(markdown_files[0], "r", encoding="utf-8") as f: + markdown_content = f.read() + + # 添加成功结果 + results.append( + { + "filename": file_name, + "status": "success", + "content": markdown_content, + "extract_path": str(file_extract_dir), + } + ) + + config.logger.debug( + f"文件 {file_name} 的结果已解压到: {file_extract_dir}" + ) + + except Exception as e: + # 下载失败,添加错误结果 + error_msg = f"下载结果失败: {str(e)}" + config.logger.error(f"文件 {file_name} {error_msg}") + results.append( + { + "filename": file_name, + "status": "error", + "error_message": error_msg, + } + ) + + # 添加处理失败的文件到结果 + for file_name, error_msg in failed_files.items(): + results.append( + { + "filename": file_name, + "status": "error", + "error_message": f"处理失败: {error_msg}", + } + ) + + # 输出处理结果统计 + success_count = len(files_download_urls) + fail_count = len(failed_files) + total_count = success_count + fail_count + + config.logger.info("\n=== 文件处理结果统计 ===") + config.logger.info(f"总文件数: {total_count}") + config.logger.info(f"成功处理: {success_count}") + config.logger.info(f"处理失败: {fail_count}") + + if failed_files: + config.logger.info("\n失败文件详情:") + for file_name, error_msg in failed_files.items(): + config.logger.info(f" - {file_name}: {error_msg}") + + if success_count > 0: + config.logger.info(f"\n结果保存目录: {extract_dir}") + else: + config.logger.info(f"\n输出目录: {extract_dir}") + + # 返回详细结果 + return { + "results": results, + "extract_dir": str(extract_dir), + "success_count": success_count, + "fail_count": fail_count, + "total_count": total_count, + } + + except Exception as e: + config.logger.error(f"处理 File 到 Markdown 失败: {str(e)}") + raise diff --git a/projects/mcp/src/mineru/cli.py b/projects/mcp/src/mineru/cli.py new file mode 100644 index 00000000..417f5a96 --- /dev/null +++ b/projects/mcp/src/mineru/cli.py @@ -0,0 +1,73 @@ +"""MinerU File转Markdown服务的命令行界面。""" + +import sys +import argparse + +from . import config +from . import server + + +def main(): + """命令行界面的入口点。""" + parser = argparse.ArgumentParser(description="MinerU File转Markdown转换服务") + + parser.add_argument( + "--output-dir", "-o", type=str, help="保存转换后文件的目录 (默认: ./downloads)" + ) + + parser.add_argument( + "--transport", + "-t", + type=str, + default="stdio", + help="协议类型 (默认: stdio,可选: sse,streamable-http)", + ) + + parser.add_argument( + "--port", + "-p", + type=int, + default=8001, + help="服务器端口 (默认: 8001, 仅在使用HTTP协议时有效)", + ) + + parser.add_argument( + "--host", + type=str, + default="127.0.0.1", + help="服务器主机地址 (默认: 127.0.0.1, 仅在使用HTTP协议时有效)", + ) + + args = parser.parse_args() + + # 检查参数有效性 + if args.transport == "stdio" and (args.host != "127.0.0.1" or args.port != 8001): + print("警告: 在STDIO模式下,--host和--port参数将被忽略", file=sys.stderr) + + # 验证API密钥 - 移动到这里,以便 --help 等参数可以无密钥运行 + if not config.MINERU_API_KEY: + print( + "错误: 启动服务需要 MINERU_API_KEY 环境变量。" + "\\n请检查是否已设置该环境变量,例如:" + "\\n export MINERU_API_KEY='your_actual_api_key'" + "\\n或者,确保在项目根目录的 `.env` 文件中定义了该变量。" + "\\n\\n您可以使用 --help 查看可用的命令行选项。", + file=sys.stderr, # 将错误消息输出到 stderr + ) + sys.exit(1) + + # 如果提供了输出目录,则进行设置 + if args.output_dir: + server.set_output_dir(args.output_dir) + + # 打印配置信息 + print("MinerU File转Markdown转换服务启动...") + if args.transport in ["sse", "streamable-http"]: + print(f"服务器地址: {args.host}:{args.port}") + print("按 Ctrl+C 可以退出服务") + + server.run_server(mode=args.transport, port=args.port, host=args.host) + + +if __name__ == "__main__": + main() diff --git a/projects/mcp/src/mineru/config.py b/projects/mcp/src/mineru/config.py new file mode 100644 index 00000000..ebfa0b7a --- /dev/null +++ b/projects/mcp/src/mineru/config.py @@ -0,0 +1,91 @@ +"""MinerU File转Markdown转换服务的配置工具。""" + +import os +import logging +from pathlib import Path +from dotenv import load_dotenv + +# 从 .env 文件加载环境变量 +load_dotenv() + +# API 配置 +MINERU_API_BASE = os.getenv("MINERU_API_BASE", "https://mineru.net") +MINERU_API_KEY = os.getenv("MINERU_API_KEY", "") + +# 本地API配置 +USE_LOCAL_API = os.getenv("USE_LOCAL_API", "").lower() in ["true", "1", "yes"] +LOCAL_MINERU_API_BASE = os.getenv("LOCAL_MINERU_API_BASE", "http://localhost:8080") + +# 转换后文件的默认输出目录 +DEFAULT_OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./downloads") + + +# 设置日志系统 +def setup_logging(): + """ + 设置日志系统,根据环境变量配置日志级别。 + + Returns: + logging.Logger: 配置好的日志记录器。 + """ + # 获取环境变量中的日志级别设置 + log_level = os.getenv("MINERU_LOG_LEVEL", "INFO").upper() + debug_mode = os.getenv("MINERU_DEBUG", "").lower() in ["true", "1", "yes"] + + # 如果设置了debug_mode,则覆盖log_level + if debug_mode: + log_level = "DEBUG" + + # 确保log_level是有效的 + valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + if log_level not in valid_levels: + log_level = "INFO" + + # 设置日志格式 + log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + # 配置日志 + logging.basicConfig(level=getattr(logging, log_level), format=log_format) + + logger = logging.getLogger("mineru") + logger.setLevel(getattr(logging, log_level)) + + # 输出日志级别信息 + logger.info(f"日志级别设置为: {log_level}") + + return logger + + +# 创建默认的日志记录器 +logger = setup_logging() + + +# 如果输出目录不存在,则创建它 +def ensure_output_dir(output_dir=None): + """ + 确保输出目录存在。 + + Args: + output_dir: 输出目录的可选路径。如果为 None,则使用 DEFAULT_OUTPUT_DIR。 + + Returns: + 表示输出目录的 Path 对象。 + """ + output_path = Path(output_dir or DEFAULT_OUTPUT_DIR) + output_path.mkdir(parents=True, exist_ok=True) + return output_path + + +# 验证 API 配置 +def validate_api_config(): + """ + 验证是否已设置所需的 API 配置。 + + Returns: + dict: 配置状态。 + """ + return { + "api_base": MINERU_API_BASE, + "api_key_set": bool(MINERU_API_KEY), + "output_dir": DEFAULT_OUTPUT_DIR, + } diff --git a/projects/mcp/src/mineru/examples.py b/projects/mcp/src/mineru/examples.py new file mode 100644 index 00000000..8bda1582 --- /dev/null +++ b/projects/mcp/src/mineru/examples.py @@ -0,0 +1,76 @@ +"""演示如何使用 MinerU File转Markdown客户端的示例。""" + +import os +import asyncio +from mcp.client import MCPClient + + +async def convert_file_url_example(): + """从 URL 转换 File 的示例。""" + client = MCPClient("http://localhost:8000") + + # 转换单个 File URL + result = await client.call( + "convert_file_url", url="https://example.com/sample.pdf", enable_ocr=True + ) + print(f"转换结果: {result}") + + # 转换多个 File URL + urls = """ + https://example.com/doc1.pdf + https://example.com/doc2.pdf + """ + result = await client.call("convert_file_url", url=urls, enable_ocr=True) + print(f"多个转换结果: {result}") + + +async def convert_file_file_example(): + """转换本地 File 文件的示例。""" + client = MCPClient("http://localhost:8000") + + # 获取测试 File 的绝对路径 + script_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.dirname(os.path.dirname(os.path.dirname(script_dir))) + test_file_path = os.path.join(project_root, "test_files", "test.pdf") + + # 转换单个 File 文件 + result = await client.call( + "convert_file_file", file_path=test_file_path, enable_ocr=True + ) + print(f"文件转换结果: {result}") + + +async def get_api_status_example(): + """获取 API 状态的示例。""" + client = MCPClient("http://localhost:8000") + + # 获取 API 状态 + status = await client.get_resource("status://api") + print(f"API 状态: {status}") + + # 获取使用帮助 + help_text = await client.get_resource("help://usage") + print(f"使用帮助: {help_text[:100]}...") # 显示前 100 个字符 + + +async def main(): + """运行所有示例。""" + print("运行 File 到 Markdown 转换示例...") + + # 检查是否设置了 API_KEY + if not os.environ.get("MINERU_API_KEY"): + print("警告: MINERU_API_KEY 环境变量未设置。") + print("使用以下命令设置: export MINERU_API_KEY=your_api_key") + print("跳过需要 API 访问的示例...") + + # 仅获取 API 状态 + await get_api_status_example() + else: + # 运行所有示例 + await convert_file_url_example() + await convert_file_file_example() + await get_api_status_example() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/projects/mcp/src/mineru/language.py b/projects/mcp/src/mineru/language.py new file mode 100644 index 00000000..755ac1ec --- /dev/null +++ b/projects/mcp/src/mineru/language.py @@ -0,0 +1,106 @@ +"""MinerU支持的语言列表。""" + +from typing import Dict, List + +# 支持的语言列表 +LANGUAGES: List[Dict[str, str]] = [ + {"name": "中文", "description": "Chinese & English", "code": "ch"}, + {"name": "英文", "description": "English", "code": "en"}, + {"name": "法文", "description": "French", "code": "fr"}, + {"name": "德文", "description": "German", "code": "german"}, + {"name": "日文", "description": "Japanese", "code": "japan"}, + {"name": "韩文", "description": "Korean", "code": "korean"}, + {"name": "中文繁体", "description": "Chinese Traditional", "code": "chinese_cht"}, + {"name": "意大利文", "description": "Italian", "code": "it"}, + {"name": "西班牙文", "description": "Spanish", "code": "es"}, + {"name": "葡萄牙文", "description": "Portuguese", "code": "pt"}, + {"name": "俄罗斯文", "description": "Russian", "code": "ru"}, + {"name": "阿拉伯文", "description": "Arabic", "code": "ar"}, + {"name": "印地文", "description": "Hindi", "code": "hi"}, + {"name": "维吾尔", "description": "Uyghur", "code": "ug"}, + {"name": "波斯文", "description": "Persian", "code": "fa"}, + {"name": "乌尔都文", "description": "Urdu", "code": "ur"}, + {"name": "塞尔维亚文(latin)", "description": "Serbian(latin)", "code": "rs_latin"}, + {"name": "欧西坦文", "description": "Occitan", "code": "oc"}, + {"name": "马拉地文", "description": "Marathi", "code": "mr"}, + {"name": "尼泊尔文", "description": "Nepali", "code": "ne"}, + { + "name": "塞尔维亚文(cyrillic)", + "description": "Serbian(cyrillic)", + "code": "rs_cyrillic", + }, + {"name": "毛利文", "description": "Maori", "code": "mi"}, + {"name": "马来文", "description": "Malay", "code": "ms"}, + {"name": "马耳他文", "description": "Maltese", "code": "mt"}, + {"name": "荷兰文", "description": "Dutch", "code": "nl"}, + {"name": "挪威文", "description": "Norwegian", "code": "no"}, + {"name": "波兰文", "description": "Polish", "code": "pl"}, + {"name": "罗马尼亚文", "description": "Romanian", "code": "ro"}, + {"name": "斯洛伐克文", "description": "Slovak", "code": "sk"}, + {"name": "斯洛文尼亚文", "description": "Slovenian", "code": "sl"}, + {"name": "阿尔巴尼亚文", "description": "Albanian", "code": "sq"}, + {"name": "瑞典文", "description": "Swedish", "code": "sv"}, + {"name": "西瓦希里文", "description": "Swahili", "code": "sw"}, + {"name": "塔加洛文", "description": "Tagalog", "code": "tl"}, + {"name": "土耳其文", "description": "Turkish", "code": "tr"}, + {"name": "乌兹别克文", "description": "Uzbek", "code": "uz"}, + {"name": "越南文", "description": "Vietnamese", "code": "vi"}, + {"name": "蒙古文", "description": "Mongolian", "code": "mn"}, + {"name": "车臣文", "description": "Chechen", "code": "che"}, + {"name": "哈里亚纳语", "description": "Haryanvi", "code": "bgc"}, + {"name": "保加利亚文", "description": "Bulgarian", "code": "bg"}, + {"name": "乌克兰文", "description": "Ukranian", "code": "uk"}, + {"name": "白俄罗斯文", "description": "Belarusian", "code": "be"}, + {"name": "泰卢固文", "description": "Telugu", "code": "te"}, + {"name": "阿巴扎文", "description": "Abaza", "code": "abq"}, + {"name": "泰米尔文", "description": "Tamil", "code": "ta"}, + {"name": "南非荷兰文", "description": "Afrikaans", "code": "af"}, + {"name": "阿塞拜疆文", "description": "Azerbaijani", "code": "az"}, + {"name": "波斯尼亚文", "description": "Bosnian", "code": "bs"}, + {"name": "捷克文", "description": "Czech", "code": "cs"}, + {"name": "威尔士文", "description": "Welsh", "code": "cy"}, + {"name": "丹麦文", "description": "Danish", "code": "da"}, + {"name": "爱沙尼亚文", "description": "Estonian", "code": "et"}, + {"name": "爱尔兰文", "description": "Irish", "code": "ga"}, + {"name": "克罗地亚文", "description": "Croatian", "code": "hr"}, + {"name": "匈牙利文", "description": "Hungarian", "code": "hu"}, + {"name": "印尼文", "description": "Indonesian", "code": "id"}, + {"name": "冰岛文", "description": "Icelandic", "code": "is"}, + {"name": "库尔德文", "description": "Kurdish", "code": "ku"}, + {"name": "立陶宛文", "description": "Lithuanian", "code": "lt"}, + {"name": "拉脱维亚文", "description": "Latvian", "code": "lv"}, + {"name": "达尔瓦文", "description": "Dargwa", "code": "dar"}, + {"name": "因古什文", "description": "Ingush", "code": "inh"}, + {"name": "拉克文", "description": "Lak", "code": "lbe"}, + {"name": "莱兹甘文", "description": "Lezghian", "code": "lez"}, + {"name": "塔巴萨兰文", "description": "Tabassaran", "code": "tab"}, + {"name": "比尔哈文", "description": "Bihari", "code": "bh"}, + {"name": "迈蒂利文", "description": "Maithili", "code": "mai"}, + {"name": "昂加文", "description": "Angika", "code": "ang"}, + {"name": "孟加拉文", "description": "Bhojpuri", "code": "bho"}, + {"name": "摩揭陀文", "description": "Magahi", "code": "mah"}, + {"name": "那格浦尔文", "description": "Nagpur", "code": "sck"}, + {"name": "尼瓦尔文", "description": "Newari", "code": "new"}, + {"name": "保加利亚文", "description": "Goan Konkani", "code": "gom"}, + {"name": "梵文", "description": "Sanskrit", "code": "sa"}, + {"name": "阿瓦尔文", "description": "Avar", "code": "ava"}, + {"name": "阿瓦尔文", "description": "Avar", "code": "ava"}, + {"name": "阿迪赫文", "description": "Adyghe", "code": "ady"}, + {"name": "巴利文", "description": "Pali", "code": "pi"}, + {"name": "拉丁文", "description": "Latin", "code": "la"}, +] + +# 构建语言代码到语言信息的映射字典,便于快速查找 +LANGUAGES_DICT: Dict[str, Dict[str, str]] = {lang["code"]: lang for lang in LANGUAGES} + + +def get_language_list() -> List[Dict[str, str]]: + """获取所有支持的语言列表。""" + return LANGUAGES + + +def get_language_by_code(code: str) -> Dict[str, str]: + """根据语言代码获取语言信息。""" + return LANGUAGES_DICT.get( + code, {"name": "未知", "description": "Unknown", "code": code} + ) diff --git a/projects/mcp/src/mineru/server.py b/projects/mcp/src/mineru/server.py new file mode 100644 index 00000000..61f18119 --- /dev/null +++ b/projects/mcp/src/mineru/server.py @@ -0,0 +1,1060 @@ +"""MinerU File转Markdown转换的FastMCP服务器实现。""" + +import json +import re +import traceback +from pathlib import Path +from typing import Annotated, Any, Dict, List, Optional + +import aiohttp +import uvicorn +from fastmcp import FastMCP +from mcp.server.sse import SseServerTransport +from pydantic import Field +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.routing import Mount, Route + +from . import config +from .api import MinerUClient +from .language import get_language_list + +# 初始化 FastMCP 服务器 +mcp = FastMCP( + name="MinerU File to Markdown Conversion", + instructions=""" + 一个将文档转化工具,可以将文档转化成Markdown、Json等格式,支持多种文件格式,包括 + PDF、Word、PPT以及图片格式(JPG、PNG、JPEG)。 + + 系统工具: + parse_documents: 解析文档(支持本地文件和URL,自动读取内容) + get_ocr_languages: 获取OCR支持的语言列表 + """, +) + +# 全局客户端实例 +_client_instance: Optional[MinerUClient] = None + + +def create_starlette_app(mcp_server, *, debug: bool = False) -> Starlette: + """创建用于SSE传输的Starlette应用。 + + Args: + mcp_server: MCP服务器实例 + debug: 是否启用调试模式 + + Returns: + Starlette: 配置好的Starlette应用实例 + """ + sse = SseServerTransport("/messages/") + + async def handle_sse(request: Request) -> None: + """处理SSE连接请求。""" + async with sse.connect_sse( + request.scope, + request.receive, + request._send, + ) as (read_stream, write_stream): + await mcp_server.run( + read_stream, + write_stream, + mcp_server.create_initialization_options(), + ) + + return Starlette( + debug=debug, + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ], + ) + + +def run_server(mode=None, port=8001, host="127.0.0.1"): + """运行 FastMCP 服务器。 + + Args: + mode: 运行模式,支持stdio、sse、streamable-http + port: 服务器端口,默认为8001,仅在HTTP模式下有效 + host: 服务器主机地址,默认为127.0.0.1,仅在HTTP模式下有效 + """ + # 确保输出目录存在 + config.ensure_output_dir(output_dir) + + # 检查是否设置了 API 密钥 + if not config.MINERU_API_KEY: + config.logger.warning("警告: MINERU_API_KEY 环境变量未设置。") + config.logger.warning("使用以下命令设置: export MINERU_API_KEY=your_api_key") + + # 获取MCP服务器实例 + mcp_server = mcp._mcp_server + + try: + # 运行服务器 + if mode == "sse": + config.logger.info(f"启动SSE服务器: {host}:{port}") + starlette_app = create_starlette_app(mcp_server, debug=True) + uvicorn.run(starlette_app, host=host, port=port) + elif mode == "streamable-http": + config.logger.info(f"启动Streamable HTTP服务器: {host}:{port}") + # 在HTTP模式下传递端口参数 + mcp.run(mode, port=port) + else: + # 默认stdio模式 + config.logger.info("启动STDIO服务器") + mcp.run(mode or "stdio") + except Exception as e: + config.logger.error(f"\n❌ 服务异常退出: {str(e)}") + traceback.print_exc() + finally: + # 清理资源 + cleanup_resources() + + +def cleanup_resources(): + """清理全局资源。""" + global _client_instance + if _client_instance is not None: + try: + # 如果客户端有close方法,调用它 + if hasattr(_client_instance, "close"): + _client_instance.close() + except Exception as e: + config.logger.error(f"清理客户端资源时出错: {str(e)}") + finally: + _client_instance = None + config.logger.info("资源清理完成") + + +def get_client() -> MinerUClient: + """获取 MinerUClient 的单例实例。如果尚未初始化,则进行初始化。""" + global _client_instance + if _client_instance is None: + _client_instance = MinerUClient() # Initialization happens here + return _client_instance + + +# Markdown 文件的输出目录 +output_dir = config.DEFAULT_OUTPUT_DIR + + +def set_output_dir(dir_path: str): + """设置转换后文件的输出目录。""" + global output_dir + output_dir = dir_path + config.ensure_output_dir(output_dir) + return output_dir + + +def parse_list_input(input_str: str) -> List[str]: + """ + 解析可能包含由逗号或换行符分隔的多个项目的字符串输入。 + + Args: + input_str: 可能包含多个项目的字符串 + + Returns: + 解析出的项目列表 + """ + if not input_str: + return [] + + # 按逗号、换行符或空格分割 + items = re.split(r"[,\n\s]+", input_str) + + # 移除空项目并处理带引号的项目 + result = [] + for item in items: + item = item.strip() + # 如果存在引号,则移除 + if (item.startswith('"') and item.endswith('"')) or ( + item.startswith("'") and item.endswith("'") + ): + item = item[1:-1] + + if item: + result.append(item) + + return result + + +async def convert_file_url( + url: str, + enable_ocr: bool = False, + language: str = "ch", + page_ranges: str | None = None, +) -> Dict[str, Any]: + """ + 从URL转换文件到Markdown格式。支持单个或多个URL处理。 + + 返回: + 成功: {"status": "success", "result_path": "输出目录路径"} + 失败: {"status": "error", "error": "错误信息"} + """ + urls_to_process = None + + # 检查是否为字典或字典列表格式的URL配置 + if isinstance(url, dict): + # 单个URL配置字典 + urls_to_process = url + elif isinstance(url, list) and len(url) > 0 and isinstance(url[0], dict): + # URL配置字典列表 + urls_to_process = url + elif isinstance(url, str): + # 检查是否为 JSON 字符串格式的多URL配置 + if url.strip().startswith("[") and url.strip().endswith("]"): + try: + # 尝试解析 JSON 字符串为URL配置列表 + url_configs = json.loads(url) + if not isinstance(url_configs, list): + raise ValueError("JSON URL配置必须是列表格式") + + urls_to_process = url_configs + except json.JSONDecodeError: + # 不是有效的 JSON,继续使用字符串解析方式 + pass + + if urls_to_process is None: + # 解析普通URL列表 + urls = parse_list_input(url) + + if not urls: + raise ValueError("未提供有效的 URL") + + if len(urls) == 1: + # 单个URL处理 + urls_to_process = {"url": urls[0], "is_ocr": enable_ocr} + else: + # 多个URL,转换为URL配置列表 + urls_to_process = [] + for url_item in urls: + urls_to_process.append( + { + "url": url_item, + "is_ocr": enable_ocr, + } + ) + + # 使用submit_file_url_task处理URLs + try: + result_path = await get_client().process_file_to_markdown( + lambda urls, o: get_client().submit_file_url_task( + urls, + o, + language=language, + page_ranges=page_ranges, + ), + urls_to_process, + enable_ocr, + output_dir, + ) + return {"status": "success", "result_path": result_path} + except Exception as e: + return {"status": "error", "error": str(e)} + + +async def convert_file_path( + file_path: str, + enable_ocr: bool = False, + language: str = "ch", + page_ranges: str | None = None, +) -> Dict[str, Any]: + """ + 将本地文件转换为Markdown格式。支持单个或多个文件批量处理。 + + 返回: + 成功: {"status": "success", "result_path": "输出目录路径"} + 失败: {"status": "error", "error": "错误信息"} + """ + + files_to_process = None + + # 检查是否为字典或字典列表格式的文件配置 + if isinstance(file_path, dict): + # 单个文件配置字典 + files_to_process = file_path + elif ( + isinstance(file_path, list) + and len(file_path) > 0 + and isinstance(file_path[0], dict) + ): + # 文件配置字典列表 + files_to_process = file_path + elif isinstance(file_path, str): + # 检查是否为 JSON 字符串格式的多文件配置 + if file_path.strip().startswith("[") and file_path.strip().endswith("]"): + try: + # 尝试解析 JSON 字符串为文件配置列表 + file_configs = json.loads(file_path) + if not isinstance(file_configs, list): + raise ValueError("JSON 文件配置必须是列表格式") + + files_to_process = file_configs + except json.JSONDecodeError: + # 不是有效的 JSON,继续使用字符串解析方式 + pass + + if files_to_process is None: + # 解析普通文件路径列表 + file_paths = parse_list_input(file_path) + + if not file_paths: + raise ValueError("未提供有效的文件路径") + + if len(file_paths) == 1: + # 单个文件处理 + files_to_process = { + "path": file_paths[0], + "is_ocr": enable_ocr, + } + else: + # 多个文件路径,转换为文件配置列表 + files_to_process = [] + for i, path in enumerate(file_paths): + files_to_process.append( + { + "path": path, + "is_ocr": enable_ocr, + } + ) + + # 使用submit_file_task处理文件 + try: + result_path = await get_client().process_file_to_markdown( + lambda files, o: get_client().submit_file_task( + files, + o, + language=language, + page_ranges=page_ranges, + ), + files_to_process, + enable_ocr, + output_dir, + ) + return {"status": "success", "result_path": result_path} + except Exception as e: + return { + "status": "error", + "error": str(e), + "params": { + "file_path": file_path, + "enable_ocr": enable_ocr, + "language": language, + }, + } + + +async def local_parse_file( + file_path: str, + parse_method: str = "auto", +) -> Dict[str, Any]: + """ + 根据环境变量设置使用本地或远程API解析文件。 + + 返回: + 成功: {"status": "success", "result": 处理结果} 或 {"status": "success", "result_path": "输出目录路径"} + 失败: {"status": "error", "error": "错误信息"} + """ + file_path = Path(file_path) + + # 检查文件是否存在 + if not file_path.exists(): + return {"status": "error", "error": f"文件不存在: {file_path}"} + + try: + # 根据环境变量决定使用本地API还是远程API + if config.USE_LOCAL_API: + config.logger.debug(f"使用本地API: {config.LOCAL_MINERU_API_BASE}") + return await _parse_file_local( + file_path=str(file_path), + parse_method=parse_method, + ) + else: + return {"status": "error", "error": "远程API未配置"} + except Exception as e: + config.logger.error(f"解析文件时出错: {str(e)}") + return {"status": "error", "error": str(e)} + + +async def read_converted_file( + file_path: str, +) -> Dict[str, Any]: + """ + 读取解析后的文件内容。主要支持Markdown和其他文本文件格式。 + + 返回: + 成功: {"status": "success", "content": "文件内容"} + 失败: {"status": "error", "error": "错误信息"} + """ + try: + target_file = Path(file_path) + parent_dir = target_file.parent + suffix = target_file.suffix.lower() + + # 支持的文本文件格式 + text_extensions = [".md", ".txt", ".json", ".html", ".tex", ".latex"] + + if suffix not in text_extensions: + return { + "status": "error", + "error": f"不支持的文件格式: {suffix}。目前仅支持以下格式: {', '.join(text_extensions)}", + } + + if not target_file.exists(): + if not parent_dir.exists(): + return {"status": "error", "error": f"目录 {parent_dir} 不存在"} + + # 递归搜索所有子目录下的同后缀文件 + similar_files_paths = [ + str(f) for f in parent_dir.rglob(f"*{suffix}") if f.is_file() + ] + + if similar_files_paths: + if len(similar_files_paths) == 1: + # 如果只找到一个文件,直接读取并返回内容 + alternative_file = similar_files_paths[0] + try: + with open(alternative_file, "r", encoding="utf-8") as f: + content = f.read() + return { + "status": "success", + "content": content, + "message": f"未找到文件 {target_file.name},但找到了 {Path(alternative_file).name},已返回其内容", + } + except Exception as e: + return { + "status": "error", + "error": f"尝试读取替代文件时出错: {str(e)}", + } + else: + # 如果找到多个文件,提供建议列表 + suggestion = f"你是否在找: {', '.join(similar_files_paths)}?" + return { + "status": "error", + "error": f"文件 {target_file.name} 不存在。在 {parent_dir} 及其子目录下找到以下同类型文件。{suggestion}", + } + else: + return { + "status": "error", + "error": f"文件 {target_file.name} 不存在,且在目录 {parent_dir} 及其子目录下未找到其他 {suffix} 文件。", + } + + # 以文本模式读取 + with open(target_file, "r", encoding="utf-8") as f: + content = f.read() + return {"status": "success", "content": content} + + except Exception as e: + config.logger.error(f"读取文件时出错: {str(e)}") + return {"status": "error", "error": str(e)} + + +async def find_and_read_markdown_content(result_path: str) -> Dict[str, Any]: + """ + 在给定的路径中寻找并读取Markdown文件内容。 + 查找所有可能的文件位置,返回所有找到的有效内容。 + + Args: + result_path: 结果目录路径 + + Returns: + Dict[str, Any]: 包含所有文件内容或错误信息的字典 + """ + if not result_path: + return {"status": "warning", "message": "未提供有效的结果路径"} + + base_path = Path(result_path) + if not base_path.exists(): + return {"status": "warning", "message": f"结果路径不存在: {result_path}"} + + # 使用集合来存储文件路径,确保唯一性 + unique_files = set() + + # 添加常见文件名 + common_files = [ + base_path / "full.md", + base_path / "full.txt", + base_path / "output.md", + base_path / "result.md", + ] + for f in common_files: + if f.exists(): + unique_files.add(str(f)) + + # 添加子目录中的常见文件名 + for subdir in base_path.iterdir(): + if subdir.is_dir(): + subdir_files = [ + subdir / "full.md", + subdir / "full.txt", + subdir / "output.md", + subdir / "result.md", + ] + for f in subdir_files: + if f.exists(): + unique_files.add(str(f)) + + # 查找所有的.md和.txt文件 + for md_file in base_path.glob("**/*.md"): + unique_files.add(str(md_file)) + for txt_file in base_path.glob("**/*.txt"): + unique_files.add(str(txt_file)) + + # 将集合转换回Path对象列表 + possible_files = [Path(f) for f in unique_files] + + config.logger.debug(f"找到 {len(possible_files)} 个可能的文件") + + # 收集所有找到的有效文件内容 + found_contents = [] + + # 尝试读取每个可能的文件 + for file_path in possible_files: + if file_path.exists(): + result = await read_converted_file(str(file_path)) + if result["status"] == "success": + config.logger.debug(f"成功读取文件内容: {file_path}") + found_contents.append( + {"file_path": str(file_path), "content": result["content"]} + ) + + # 如果找到了文件内容 + if found_contents: + config.logger.debug(f"在结果目录中找到了 {len(found_contents)} 个可读取的文件") + # 如果只找到一个文件,保持向后兼容的返回格式 + if len(found_contents) == 1: + return { + "status": "success", + "content": found_contents[0]["content"], + "file_path": found_contents[0]["file_path"], + } + # 如果找到多个文件,返回内容列表 + else: + return {"status": "success", "contents": found_contents} + + # 如果没有找到任何有效的文件 + return { + "status": "warning", + "message": f"无法在结果目录中找到可读取的Markdown文件: {result_path}", + } + + +async def _process_conversion_result( + result: Dict[str, Any], source: str, is_url: bool = False +) -> Dict[str, Any]: + """ + 处理转换结果,统一格式化输出。 + + Args: + result: 转换函数返回的结果 + source: 源文件路径或URL + is_url: 是否为URL + + Returns: + 格式化后的结果字典 + """ + filename = source.split("/")[-1] + if is_url and "?" in filename: + filename = filename.split("?")[0] + elif not is_url: + filename = Path(source).name + + base_result = { + "filename": filename, + "source_url" if is_url else "source_path": source, + } + + if result["status"] == "success": + # 获取result_path,可能是字符串或字典 + result_path = result.get("result_path") + + # 记录调试信息 + config.logger.debug(f"处理结果 result_path 类型: {type(result_path)}") + + if result_path: + # 情况1: result_path是字典且包含results字段(批量处理结果) + if isinstance(result_path, dict) and "results" in result_path: + config.logger.debug("检测到批量处理结果格式") + + # 查找与当前源文件匹配的结果 + for item in result_path.get("results", []): + if item.get("filename") == filename or ( + not is_url and Path(source).name == item.get("filename") + ): + # 直接返回匹配项的状态,无论是success还是error + if item.get("status") == "success" and "content" in item: + base_result.update( + { + "status": "success", + "content": item.get("content", ""), + } + ) + # 如果有extract_path,也添加进去 + if "extract_path" in item: + base_result["extract_path"] = item["extract_path"] + return base_result + elif item.get("status") == "error": + # 处理失败的文件,直接返回error状态 + base_result.update( + { + "status": "error", + "error_message": item.get( + "error_message", "文件处理失败" + ), + } + ) + return base_result + + # 如果没有找到匹配的结果,但有extract_dir,尝试从那里读取 + if "extract_dir" in result_path: + config.logger.debug( + f"尝试从extract_dir读取: {result_path['extract_dir']}" + ) + try: + content_result = await find_and_read_markdown_content( + result_path["extract_dir"] + ) + if content_result.get("status") == "success": + base_result.update( + { + "status": "success", + "content": content_result.get("content", ""), + "extract_path": result_path["extract_dir"], + } + ) + return base_result + except Exception as e: + config.logger.error(f"从extract_dir读取内容时出错: {str(e)}") + + # 如果上述方法都失败,返回错误 + base_result.update( + { + "status": "error", + "error_message": "未能在批量处理结果中找到匹配的内容", + } + ) + + # 情况2: result_path是字符串(传统格式) + elif isinstance(result_path, str): + config.logger.debug(f"处理传统格式结果路径: {result_path}") + content_result = await find_and_read_markdown_content(result_path) + if content_result.get("status") == "success": + base_result.update( + { + "status": "success", + "content": content_result.get("content", ""), + "extract_path": result_path, + } + ) + else: + base_result.update( + { + "status": "error", + "error_message": f"无法读取转换结果: {content_result.get('message', '')}", + } + ) + + # 情况3: result_path是其他类型的字典(尝试处理) + elif isinstance(result_path, dict): + config.logger.debug(f"处理其他字典格式: {result_path}") + # 尝试从字典中提取可能的路径 + extract_path = ( + result_path.get("extract_dir") + or result_path.get("path") + or result_path.get("dir") + ) + if extract_path and isinstance(extract_path, str): + try: + content_result = await find_and_read_markdown_content( + extract_path + ) + if content_result.get("status") == "success": + base_result.update( + { + "status": "success", + "content": content_result.get("content", ""), + "extract_path": extract_path, + } + ) + return base_result + except Exception as e: + config.logger.error(f"从extract_path读取内容时出错: {str(e)}") + + # 如果没有找到有效路径,返回错误 + base_result.update( + {"status": "error", "error_message": "转换结果格式无法识别"} + ) + else: + # 情况4: result_path是其他类型(错误) + base_result.update( + { + "status": "error", + "error_message": f"无法识别的result_path类型: {type(result_path)}", + } + ) + else: + base_result.update( + {"status": "error", "error_message": "转换成功但未返回结果路径"} + ) + else: + base_result.update( + {"status": "error", "error_message": result.get("error", "未知错误")} + ) + + return base_result + + +@mcp.tool() +async def parse_documents( + file_sources: Annotated[ + str, + Field( + description="""文件路径或URL,支持以下格式: + - 单个路径或URL: "/path/to/file.pdf" 或 "https://example.com/document.pdf" + - 多个路径或URL(逗号分隔): "/path/to/file1.pdf, /path/to/file2.pdf" 或 + "https://example.com/doc1.pdf, https://example.com/doc2.pdf" + - 混合路径和URL: "/path/to/file.pdf, https://example.com/document.pdf" + (支持pdf、ppt、pptx、doc、docx以及图片格式jpg、jpeg、png)""" + ), + ], + # 通用参数 + enable_ocr: Annotated[bool, Field(description="启用OCR识别,默认False")] = False, + language: Annotated[ + str, Field(description='文档语言,默认"ch"中文,可选"en"英文等') + ] = "ch", + # 远程API参数 + page_ranges: Annotated[ + str | None, + Field( + description='指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6":表示选取第2页、第4页至第6页;"2--2":表示从第2页一直选取到倒数第二页。(远程API),默认None' + ), + ] = None, +) -> Dict[str, Any]: + """ + 统一接口,将文件转换为Markdown格式。支持本地文件和URL,会根据USE_LOCAL_API配置自动选择合适的处理方式。 + + 当USE_LOCAL_API=true时: + - 会过滤掉http/https开头的URL路径 + - 对本地文件使用本地API进行解析 + + 当USE_LOCAL_API=false时: + - 将http/https开头的路径使用convert_file_url处理 + - 将其他路径使用convert_file_path处理 + + 处理完成后,会自动尝试读取转换后的文件内容并返回。 + + 返回: + 成功: {"status": "success", "content": "文件内容"} 或 {"status": "success", "results": [处理结果列表]} + 失败: {"status": "error", "error": "错误信息"} + """ + # 解析路径列表 + sources = parse_list_input(file_sources) + if not sources: + return {"status": "error", "error": "未提供有效的文件路径或URL"} + + # 去重处理,使用字典来保持原始顺序 + sources = list(dict.fromkeys(sources)) + + config.logger.debug(f"去重后的文件路径: {sources}") + + # 记录去重信息 + original_count = len(parse_list_input(file_sources)) + unique_count = len(sources) + if original_count > unique_count: + config.logger.debug( + f"检测到重复路径,已自动去重: {original_count} -> {unique_count}" + ) + + # 将路径分类 + url_paths = [] + file_paths = [] + + for source in sources: + if source.lower().startswith(("http://", "https://")): + url_paths.append(source) + else: + file_paths.append(source) + + results = [] + + # 根据USE_LOCAL_API决定处理方式 + if config.USE_LOCAL_API: + # 在本地API模式下,只处理本地文件路径 + if not file_paths: + return { + "status": "warning", + "message": "在本地API模式下,无法处理URL,且未提供有效的本地文件路径", + } + + config.logger.info(f"使用本地API处理 {len(file_paths)} 个文件") + + # 逐个处理本地文件 + for path in file_paths: + try: + # 跳过不存在的文件 + if not Path(path).exists(): + results.append( + { + "filename": Path(path).name, + "source_path": path, + "status": "error", + "error_message": f"文件不存在: {path}", + } + ) + continue + + result = await local_parse_file( + file_path=path, + parse_method=( + "ocr" if enable_ocr else "txt" + ), # 如果启用OCR,使用ocr,否则使用txt + ) + + # 添加文件名信息 + result_with_filename = { + "filename": Path(path).name, + "source_path": path, + **result, + } + results.append(result_with_filename) + + except Exception as e: + # 处理文件时出现异常,记录错误但继续处理下一个文件 + config.logger.error(f"处理文件 {path} 时出现错误: {str(e)}") + results.append( + { + "filename": Path(path).name, + "source_path": path, + "status": "error", + "error_message": f"处理文件时出现异常: {str(e)}", + } + ) + + else: + # 在远程API模式下,分别处理URL和本地文件路径 + if url_paths: + config.logger.info(f"使用远程API处理 {len(url_paths)} 个文件URL") + + try: + # 调用convert_file_url处理URLs + url_result = await convert_file_url( + url=",".join(url_paths), + enable_ocr=enable_ocr, + language=language, + page_ranges=page_ranges, + ) + + if url_result["status"] == "success": + # 为每个URL生成对应的结果 + for url in url_paths: + result_item = await _process_conversion_result( + url_result, url, is_url=True + ) + results.append(result_item) + else: + # 转换失败,为所有URL添加错误结果 + for url in url_paths: + results.append( + { + "filename": url.split("/")[-1].split("?")[0], + "source_url": url, + "status": "error", + "error_message": url_result.get("error", "URL处理失败"), + } + ) + + except Exception as e: + config.logger.error(f"处理URL时出现错误: {str(e)}") + for url in url_paths: + results.append( + { + "filename": url.split("/")[-1].split("?")[0], + "source_url": url, + "status": "error", + "error_message": f"处理URL时出现异常: {str(e)}", + } + ) + + if file_paths: + config.logger.info(f"使用远程API处理 {len(file_paths)} 个本地文件") + + # 过滤出存在的文件 + existing_files = [] + for file_path in file_paths: + if not Path(file_path).exists(): + results.append( + { + "filename": Path(file_path).name, + "source_path": file_path, + "status": "error", + "error_message": f"文件不存在: {file_path}", + } + ) + else: + existing_files.append(file_path) + + if existing_files: + try: + # 调用convert_file_path处理本地文件 + file_result = await convert_file_path( + file_path=",".join(existing_files), + enable_ocr=enable_ocr, + language=language, + page_ranges=page_ranges, + ) + + config.logger.debug(f"file_result: {file_result}") + + if file_result["status"] == "success": + # 为每个文件生成对应的结果 + for file_path in existing_files: + result_item = await _process_conversion_result( + file_result, file_path, is_url=False + ) + results.append(result_item) + else: + # 转换失败,为所有文件添加错误结果 + for file_path in existing_files: + results.append( + { + "filename": Path(file_path).name, + "source_path": file_path, + "status": "error", + "error_message": file_result.get( + "error", "文件处理失败" + ), + } + ) + + except Exception as e: + config.logger.error(f"处理本地文件时出现错误: {str(e)}") + for file_path in existing_files: + results.append( + { + "filename": Path(file_path).name, + "source_path": file_path, + "status": "error", + "error_message": f"处理文件时出现异常: {str(e)}", + } + ) + + # 处理结果为空的情况 + if not results: + return {"status": "error", "error": "未处理任何文件"} + + # 计算成功和失败的统计信息 + success_count = len([r for r in results if r.get("status") == "success"]) + error_count = len([r for r in results if r.get("status") == "error"]) + total_count = len(results) + + # 只有一个结果时,直接返回该结果(保持向后兼容) + if len(results) == 1: + result = results[0].copy() + # 为了向后兼容,移除新增的字段 + if "filename" in result: + del result["filename"] + if "source_path" in result: + del result["source_path"] + if "source_url" in result: + del result["source_url"] + return result + + # 多个结果时,返回详细的结果列表 + # 根据成功/失败情况决定整体状态 + overall_status = "success" + if success_count == 0: + # 所有文件都失败 + overall_status = "error" + elif error_count > 0: + # 有部分文件失败,但不是全部 + overall_status = "partial_success" + + return { + "status": overall_status, + "results": results, + "summary": { + "total_files": total_count, + "success_count": success_count, + "error_count": error_count, + }, + } + + +@mcp.tool() +async def get_ocr_languages() -> Dict[str, Any]: + """ + 获取 OCR 支持的语言列表。 + + Returns: + Dict[str, Any]: 包含所有支持的OCR语言列表的字典 + """ + try: + # 从language模块获取语言列表 + languages = get_language_list() + return {"status": "success", "languages": languages} + except Exception as e: + return {"status": "error", "error": str(e)} + + +async def _parse_file_local( + file_path: str, + parse_method: str = "auto", +) -> Dict[str, Any]: + """ + 使用本地API解析文件。 + + Args: + file_path: 要解析的文件路径 + parse_method: 解析方法 + output_dir: 输出目录 + + Returns: + Dict[str, Any]: 包含解析结果的字典 + """ + # API URL路径 + api_url = f"{config.LOCAL_MINERU_API_BASE}/file_parse" + + # 使用Path对象确保文件路径正确 + file_path_obj = Path(file_path) + if not file_path_obj.exists(): + raise FileNotFoundError(f"文件不存在: {file_path}") + + # 读取文件二进制数据 + with open(file_path_obj, "rb") as f: + file_data = f.read() + + # 准备用于上传文件的表单数据 + file_type = file_path_obj.suffix.lower() + form_data = aiohttp.FormData() + form_data.add_field( + "file", file_data, filename=file_path_obj.name, content_type=file_type + ) + form_data.add_field("parse_method", parse_method) + + config.logger.debug(f"发送本地API请求到: {api_url}") + config.logger.debug(f"上传文件: {file_path_obj.name} (大小: {len(file_data)} 字节)") + + # 发送请求 + try: + async with aiohttp.ClientSession() as session: + async with session.post(api_url, data=form_data) as response: + if response.status != 200: + error_text = await response.text() + config.logger.error( + f"API返回错误状态码: {response.status}, 错误信息: {error_text}" + ) + raise RuntimeError(f"API返回错误: {response.status}, {error_text}") + + result = await response.json() + + config.logger.debug(f"本地API响应: {result}") + + # 处理响应 + if "error" in result: + return {"status": "error", "error": result["error"]} + + return {"status": "success", "result": result} + except aiohttp.ClientError as e: + error_msg = f"与本地API通信时出错: {str(e)}" + config.logger.error(error_msg) + raise RuntimeError(error_msg)