Gemini 系列的大模型多模态能力在最近两个版本( 2.0flash/2.5pro )有了明显的提升;于是本着干中学的态度,写了一个 PDF 转 Markdown 的在线工具
- 支持扫描件/公式/表格等多种文档内容格式
- 支持多栏论文按阅读顺序解析
- 支持文档内图像提取
- 支持提取并翻译为目标语言
以下为在线地址,无需登录即可使用:
pdf2md.aitranspdf.com
为控制成本,使用时需填入授权码,下方分享一批授权码
455ae82d7e4a427b86fd19c733da01d4,
83411d78e2a54cce99a5a4d7394c884d,
f90f8ace02fc43079a44afac727cf39d,
ee509e4d307c4c64b48833e00cbae9fa,
35cdc2be66cb41138bf33b6c94c9a55a,
ad68df9f8fa64c84b1cab4463671d935,
59974330e6d346bca8638221d33da4ce
同时贴上提取相关的 python 核心代码,欢迎大佬指正优化
from google import generativeai as genai
import os
from pathlib import Path
from typing import List, Dict
import base64
import mimetypes
import time
from tqdm import tqdm
import tempfile
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import fitz # PyMuPDF
import uuid
import atexit
import json
class PDFToMarkdownConverter:
def __init__(self, api_key: str, api_endpoint: str = None, chunk_size: int = 20, max_retries: int = 3):
"""
初始化转换器
:param api_key: Gemini API 密钥
:param api_endpoint: 代理服务器地址
:param chunk_size: 每个分块的页数
:param max_retries: 最大重试次数
"""
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# 基础配置
self.chunk_size = chunk_size
self.max_retries = max_retries
# 创建临时目录
self.temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp')
os.makedirs(self.temp_dir, exist_ok=True)
self.logger.info(f"临时文件目录: {self.temp_dir}")
# 设置 API 端点
if api_endpoint:
os.environ['GOOGLE_API_BASE_URL'] = api_endpoint
# 配置 API
genai.configure(
api_key=api_key,
transport="rest"
)
# 初始化模型
self.model = genai.GenerativeModel('gemini-2.0-flash')
# 注册退出时的清理函数
atexit.register(self._cleanup_temp_dir)
def _extract_images_from_pdf(self, pdf_path: str, task_id: str) -> Dict[int, List[Dict]]:
"""
从 PDF 中提取图片
:param pdf_path: PDF 文件路径
:param task_id: 任务 ID ,用于生成唯一的图片名称
:return: 按页码索引的图片信息字典 {页码: [图片信息列表]}
"""
images_by_page = {}
doc = fitz.open(pdf_path)
for page_num, page in enumerate(doc):
image_list = page.get_images(full=True)
page_images = []
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
# 生成唯一的图片名称
image_filename = f"{task_id}_page{page_num + 1}_img{img_index + 1}.{image_ext}"
# 获取图片在页面中的位置
rect = page.get_image_bbox(img_info)
page_images.append({
"filename": image_filename,
"data": image_bytes,
"ext": image_ext,
"rect": rect,
"xref": xref
})
if page_images:
images_by_page[page_num] = page_images
doc.close()
return images_by_page
def _upload_images_to_s3(self, images_by_page: Dict[int, List[Dict]], s3_service) -> Dict[int, List[Dict]]:
"""
将提取的图片上传到 S3
:param images_by_page: 按页码索引的图片信息字典
:param s3_service: S3 服务实例
:return: 更新后的图片信息字典,包含 S3 URL
"""
for page_num, images in images_by_page.items():
for img in images:
try:
# 上传图片到 S3
file_key = f"images/{img['filename']}"
img_url = s3_service.upload_binary(
img['data'],
file_key,
f"image/{img['ext']}"
)
img['url'] = img_url
except Exception as e:
self.logger.error(f"图片上传失败: {str(e)}")
img['url'] = None
return images_by_page
def _cleanup_temp_dir(self):
"""
清理临时目录
"""
try:
if os.path.exists(self.temp_dir):
for file in os.listdir(self.temp_dir):
try:
file_path = os.path.join(self.temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
self.logger.warning(f"清理临时文件失败: {str(e)}")
os.rmdir(self.temp_dir)
except Exception as e:
self.logger.warning(f"清理临时目录失败: {str(e)}")
def _split_pdf(self, pdf_path: str) -> List[str]:
"""
将 PDF 分割成小块
:param pdf_path: PDF 文件路径
:return: 临时 PDF 文件路径列表
"""
temp_files = []
doc = fitz.open(pdf_path)
total_pages = doc.page_count
self.logger.info(f"PDF 总页数: {total_pages}")
for start in range(0, total_pages, self.chunk_size):
end = min(start + self.chunk_size, total_pages)
# 创建新的 PDF 文档
new_doc = fitz.open()
new_doc.insert_pdf(doc, from_page=start, to_page=end - 1)
# 使用 UUID 创建唯一的临时文件名
temp_file_path = os.path.join(self.temp_dir, f'chunk_{uuid.uuid4().hex}.pdf')
new_doc.save(temp_file_path)
new_doc.close()
temp_files.append(temp_file_path)
doc.close()
return temp_files
def _read_pdf_file(self, pdf_path: str) -> dict:
"""
读取 PDF 文件
:param pdf_path: PDF 文件路径
:return: 包含文件内容的字典
"""
try:
with open(pdf_path, 'rb') as file:
content = file.read()
base64_content = base64.b64encode(content).decode('utf-8')
return {
"mime_type": "application/pdf",
"data": base64_content
}
except Exception as e:
raise Exception(f"PDF 文件读取失败: {str(e)}")
def _process_chunk(self, pdf_path: str, keywords: List[str], chunk_index: int,
translation_mode: str = "single", target_language: str = None) -> Dict:
"""
处理单个 PDF 分块
:param pdf_path: PDF 分块文件路径
:param keywords: 关键词列表
:param chunk_index: 分块索引
:param translation_mode: 转换模式 ('single' 或 'bilingual')
:param target_language: 目标语言
:return: 包含处理结果的字典
"""
for attempt in range(self.max_retries):
try:
pdf_data = self._read_pdf_file(pdf_path)
# 获取当前分块的页数信息
doc = fitz.open(pdf_path)
page_info = f"(页码 {chunk_index * self.chunk_size + 1} - {chunk_index * self.chunk_size + doc.page_count})"
doc.close()
# 根据转换模式选择不同的提示词
if translation_mode == "single":
prompt = f"""
请将这份 PDF 文档{page_info}转换为 Markdown 格式。
转换要求:
{', '.join(keywords)}
请确保:
1. 输出格式为标准 Markdown ,但不要用```markdown 包裹全部内容
2. 保持文档的结构和层级
3. 保持内容的完整性
4. 表格要转换为 Markdown 表格格式
5. 代码块要使用正确的格式
6. 正确使用 Markdown 换行语法(使用两个空格或空行实现换行)
"""
else:
prompt = f"""
请将这份 PDF 文档{page_info}转换为双语对照的 Markdown 格式,不要用```markdown 包裹全部内容。
转换要求:
1. 将内容翻译成{target_language}
2. 使用对照模式展示原文和译文
3. {', '.join(keywords)}
输出格式要求:
1. 对于每个章节标题:
原文标题
译文标题
2. 对于正文内容:
原文段落
译文段落
3. 对于表格,输出翻译前后的两个表格的独立:
4. 对于代码块:
保持代码块原样,只翻译注释
请确保:
1. 保持文档的结构和层级
2. 翻译准确且符合目标语言的表达习惯
3. 保持格式的一致性
4. 专业术语的翻译准确
5. 正确使用 Markdown 换行语法(使用两个空格或空行实现换行)
6. 译文用引用的 Markdown 语法包裹以区分
"""
response = self.model.generate_content([prompt, pdf_data])
return {
'index': chunk_index,
'content': response.text,
'success': True
}
except Exception as e:
self.logger.error(f"处理分块{chunk_index}第{attempt + 1}次尝试失败: {str(e)}")
if attempt == self.max_retries - 1:
return {
'index': chunk_index,
'content': f"<!-- 处理失败: {str(e)} -->",
'success': False
}
def convert_to_markdown(self, pdf_path: str, keywords: List[str],
translation_mode: str = "single",
target_language: str = None,
parallel: bool = True,
progress_callback=None,
s3_service=None,
task_id: str = None) -> str:
"""
将 PDF 转换为 Markdown ,包括图片
:param pdf_path: PDF 文件路径
:param keywords: 指导转换的关键词列表
:param translation_mode: 转换模式 ('single' 或 'bilingual')
:param target_language: 目标语言
:param parallel: 是否并行处理
:param progress_callback: 进度回调函数
:param s3_service: S3 服务实例
:param task_id: 任务 ID
:return: Markdown 格式的文本
"""
start_time = time.time()
self.logger.info(f"开始转换... 模式: {translation_mode}" +
(f", 目标语言: {target_language}" if translation_mode == "bilingual" else ""))
temp_files = []
try:
# 提取图片(如果提供了 S3 服务和任务 ID )
images_by_page = {}
if s3_service and task_id:
self.logger.info("开始提取 PDF 中的图片...")
images_by_page = self._extract_images_from_pdf(pdf_path, task_id)
self.logger.info(f"共提取了 {sum(len(imgs) for imgs in images_by_page.values())} 张图片")
# 上传图片到 S3
self.logger.info("开始上传图片到 S3...")
images_by_page = self._upload_images_to_s3(images_by_page, s3_service)
# # 分割 PDF
# temp_files = self._split_pdf(pdf_path)
# chunks_count = len(temp_files)
# self.logger.info(f"PDF 已分割为{chunks_count}个块")
# 添加图片信息到关键词中
if images_by_page:
# 创建一个不包含二进制数据的图片信息字典
image_info_for_json = {}
for page_num, images in images_by_page.items():
image_info_for_json[page_num] = []
for img in images:
# 创建不包含二进制数据的图片信息副本
img_copy = {k: v for k, v in img.items() if k != 'data'}
# 将 Rect 对象转换为列表,以便 JSON 序列化
if 'rect' in img_copy and hasattr(img_copy['rect'], 'to_list'):
img_copy['rect'] = img_copy['rect'].to_list()
elif 'rect' in img_copy:
# 如果没有 to_list 方法,尝试直接转换为列表
img_copy['rect'] = [float(img_copy['rect'][0]), float(img_copy['rect'][1]),
float(img_copy['rect'][2]), float(img_copy['rect'][3])]
image_info_for_json[page_num].append(img_copy)
# 将处理后的图片信息转换为 JSON 字符串
try:
image_info_json = json.dumps(image_info_for_json)
keywords.append(f"PDF 中包含图片,请在适当位置插入图片链接。图片信息: {image_info_json}")
keywords.append("对于每个图片,使用 Markdown 图片语法  插入")
except TypeError as e:
self.logger.error(f"图片信息 JSON 序列化失败: {str(e)}")
# 如果序列化失败,添加简化的图片信息
simple_image_info = {}
for page_num, images in images_by_page.items():
simple_image_info[str(page_num)] = [
{"url": img.get('url', ''), "filename": img.get('filename', '')}
for img in images
]
image_info_json = json.dumps(simple_image_info)
keywords.append(f"PDF 中包含图片,请在适当位置插入图片链接。简化图片信息: {image_info_json}")
keywords.append("对于每个图片,使用 Markdown 图片语法  插入")
# 分割 PDF
temp_files = self._split_pdf(pdf_path)
chunks_count = len(temp_files)
self.logger.info(f"PDF 已分割为{chunks_count}个块")
results = []
processed_count = 0
if parallel:
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self._process_chunk, temp_file, keywords, idx,
translation_mode, target_language)
for idx, temp_file in enumerate(temp_files)
]
for future in as_completed(futures):
processed_count += 1
if progress_callback:
progress_callback(processed_count, chunks_count)
results.append(future.result())
else:
for idx, temp_file in enumerate(temp_files):
result = self._process_chunk(temp_file, keywords, idx,
translation_mode, target_language)
processed_count += 1
if progress_callback:
progress_callback(processed_count, chunks_count)
results.append(result)
# 按索引排序并合并结果
results.sort(key=lambda x: x['index'])
markdown_content = "\n\n".join(result['content'] for result in results)
# 计算并显示耗时
duration = time.time() - start_time
if duration < 60:
time_str = f"{duration:.2f}秒"
elif duration < 3600:
time_str = f"{duration / 60:.2f}分钟"
else:
time_str = f"{duration / 3600:.2f}小时"
# 输出转换统计信息
self.logger.info(f"转换完成!耗时: {time_str}")
# 统计成功率
success_count = sum(1 for r in results if r['success'])
self.logger.info(f"处理成功率: {success_count}/{chunks_count} " +
f"({success_count / chunks_count * 100:.2f}%)")
# 如果是双语模式,添加文档头部说明
if translation_mode == "bilingual":
# header = f"""# 双语对照文档
# 原文与{target_language}对照
#
# ---
#
# """
markdown_content = markdown_content
return markdown_content
except Exception as e:
error_msg = f"转换过程出错: {str(e)}"
self.logger.error(error_msg)
raise Exception(error_msg)
finally:
# 清理临时文件
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
self.logger.warning(f"清理临时文件失败: {str(e)}")
def save_markdown(self, markdown_text: str, output_path: str):
"""
保存 Markdown 文件
:param markdown_text: Markdown 文本
:param output_path: 输出文件路径
"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_text)
self.logger.info(f"Markdown 文件已保存至: {output_path}")
except Exception as e:
raise Exception(f"文件保存错误: {str(e)}")
def main():
# 配置信息
api_key = "xxxxxxxxxxxxxxx"
api_endpoint = "https://xxxx.xxxxxxxxxxxxxx.com" # 你的代理服务器地址
# 文件路径
pdf_path = "origin.pdf" # 替换为你的 PDF 文件路径
output_path = ("output-doc.md")
# 转换指导关键词
# keywords = [
# "保持文档的标题层级结构",
# "将表格转换为 Markdown 表格格式",
# "如果文档中有公式,注意公式的识别和转换",
# "如果文档中有多栏格式时,注意多栏模式下的阅读顺序保持",
# "保持列表的缩进和格式",
# "突出显示重要内容",
# "保持代码块格式",
# "添加适当的分隔符",
# "注意只需要输出 Markdown 内容,不要做多余的解释",
# "如果发现文档内有图例,可以在需要插入图例的位置插入图片占位符"
# "注意提取的完整性,不要错漏文档的内容"
# ]
keywords = [
"You are an expert OCR assistant. Your job is to extract all text from the provided image and convert it into a well-structured, easy-to-read Markdown document that mirrors the intended structure of the original. Follow these precise guidelines:",
"Use Markdown headings, paragraphs, lists, and tables to match the document's hierarchy and flow.",
"For tables, use standard Markdown table syntax and merge cells if needed. If a table has a title, include it as plain text above the table.",
"Render mathematical formulas with LaTeX syntax: use $...$ for inline and $$...$$ for display equations.",
"For images, use the syntax  with a clear, descriptive alt text.",
"Remove unnecessary line breaks so that the text flows naturally without awkward breaks.",
"Your final Markdown output must be direct text (do not wrap it in code blocks).",
"Ensure your output is clear, accurate, and faithfully reflects the original image's content and structure."
]
# 转换模式配置
translation_mode = "single" # 可选 "single" 或 "bilingual"
target_language = "Chinese" # 目标语言,如 "英语"、"日语" 等
try:
# 根据转换模式设置输出文件名
base_name, ext = os.path.splitext(output_path)
if translation_mode == "bilingual":
output_path = f"{base_name}_{target_language}{ext}"
# 初始化转换器
converter = PDFToMarkdownConverter(
api_key=api_key,
api_endpoint=api_endpoint,
chunk_size=3,
max_retries=3
)
# 转换 PDF 到 Markdown
print(f"开始转换... 模式: {translation_mode}" +
(f", 目标语言: {target_language}" if translation_mode == "bilingual" else ""))
markdown_content = converter.convert_to_markdown(
pdf_path=pdf_path,
keywords=keywords,
translation_mode=translation_mode,
target_language=target_language,
parallel=True
)
# 保存结果
converter.save_markdown(markdown_content, output_path)
print(f"转换完成!文件已保存至: {output_path}")
except Exception as e:
print(f"转换过程中出现错误: {str(e)}")
if __name__ == "__main__":
main()
|