data-extractor by claude-office-skills/skills
npx skills add https://github.com/claude-office-skills/skills --skill data-extractor此技能使用 unstructured 从任何文档格式中提取结构化数据,这是一个用于处理 PDF、Word 文档、电子邮件、HTML 等的统一库。无论输入格式如何,都能获得一致的结构化输出。
示例提示:
from unstructured.partition.auto import partition
# 自动检测并处理任何文档
elements = partition("document.pdf")
# 访问提取的元素
for element in elements:
print(f"Type: {type(element).__name__}")
print(f"Text: {element.text}")
print(f"Metadata: {element.metadata}")
| 格式 | 函数 | 备注 |
|---|---|---|
partition_pdf |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 原生 + 扫描件 |
| Word | partition_docx | 完整结构 |
| PowerPoint | partition_pptx | 幻灯片和备注 |
| Excel | partition_xlsx | 工作表和表格 |
| 电子邮件 | partition_email | 正文和附件 |
| HTML | partition_html | 标签保留 |
| Markdown | partition_md | 结构保留 |
| 纯文本 | partition_text | 基本解析 |
| 图像 | partition_image | OCR 提取 |
from unstructured.documents.elements import (
Title,
NarrativeText,
Text,
ListItem,
Table,
Image,
Header,
Footer,
PageBreak,
Address,
EmailAddress,
)
# 元素具有一致的结构
element.text # 原始文本内容
element.metadata # 丰富的元数据
element.category # 元素类型
element.id # 唯一标识符
from unstructured.partition.auto import partition
# 处理任何文件类型
elements = partition(
filename="document.pdf",
strategy="auto", # 或 "fast", "hi_res", "ocr_only"
include_metadata=True,
include_page_breaks=True,
)
# 按类型过滤
titles = [e for e in elements if isinstance(e, Title)]
tables = [e for e in elements if isinstance(e, Table)]
# 带选项的 PDF
from unstructured.partition.pdf import partition_pdf
elements = partition_pdf(
filename="document.pdf",
strategy="hi_res", # 高质量提取
infer_table_structure=True, # 检测表格
include_page_breaks=True,
languages=["en"], # OCR 语言
)
# Word 文档
from unstructured.partition.docx import partition_docx
elements = partition_docx(
filename="document.docx",
include_metadata=True,
)
# HTML
from unstructured.partition.html import partition_html
elements = partition_html(
filename="page.html",
include_metadata=True,
)
from unstructured.partition.auto import partition
elements = partition("report.pdf", infer_table_structure=True)
# 提取表格
for element in elements:
if element.category == "Table":
print("找到表格:")
print(element.text)
# 访问结构化表格数据
if hasattr(element, 'metadata') and element.metadata.text_as_html:
print("HTML:", element.metadata.text_as_html)
from unstructured.partition.auto import partition
elements = partition("document.pdf")
for element in elements:
meta = element.metadata
# 常见元数据字段
print(f"页面: {meta.page_number}")
print(f"文件名: {meta.filename}")
print(f"文件类型: {meta.filetype}")
print(f"坐标: {meta.coordinates}")
print(f"语言: {meta.languages}")
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
from unstructured.chunking.basic import chunk_elements
# 分区文档
elements = partition("document.pdf")
# 按标题分块(语义分块)
chunks = chunk_by_title(
elements,
max_characters=1000,
combine_text_under_n_chars=200,
)
# 或基本分块
chunks = chunk_elements(
elements,
max_characters=500,
overlap=50,
)
for chunk in chunks:
print(f"分块 ({len(chunk.text)} 字符):")
print(chunk.text[:100] + "...")
from unstructured.partition.auto import partition
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def process_document(file_path):
"""处理单个文档。"""
try:
elements = partition(str(file_path))
return {
'file': str(file_path),
'status': 'success',
'elements': len(elements),
'text': '\n\n'.join([e.text for e in elements])
}
except Exception as e:
return {
'file': str(file_path),
'status': 'error',
'error': str(e)
}
def batch_process(input_dir, max_workers=4):
"""处理目录中的所有文档。"""
input_path = Path(input_dir)
files = list(input_path.glob('*'))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_document, files))
return results
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json, elements_to_dicts
elements = partition("document.pdf")
# 转为 JSON 字符串
json_str = elements_to_json(elements)
# 转为字典列表
dicts = elements_to_dicts(elements)
# 转为 DataFrame
import pandas as pd
df = pd.DataFrame(dicts)
def document_to_json(file_path, output_path=None):
"""将文档转换为结构化 JSON。"""
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
import json
elements = partition(file_path)
# 创建结构化输出
output = {
'source': file_path,
'elements': []
}
for element in elements:
output['elements'].append({
'type': type(element).__name__,
'text': element.text,
'metadata': {
'page': element.metadata.page_number,
'coordinates': element.metadata.coordinates.to_dict() if element.metadata.coordinates else None
}
})
if output_path:
with open(output_path, 'w') as f:
json.dump(output, f, indent=2)
return output
from unstructured.partition.email import partition_email
def parse_email(email_path):
"""从电子邮件中提取结构化数据。"""
elements = partition_email(email_path)
email_data = {
'subject': None,
'from': None,
'to': [],
'date': None,
'body': [],
'attachments': []
}
for element in elements:
meta = element.metadata
# 从元数据中提取标题
if meta.subject:
email_data['subject'] = meta.subject
if meta.sent_from:
email_data['from'] = meta.sent_from
if meta.sent_to:
email_data['to'] = meta.sent_to
# 正文内容
email_data['body'].append({
'type': type(element).__name__,
'text': element.text
})
return email_data
from unstructured.partition.pdf import partition_pdf
from unstructured.chunking.title import chunk_by_title
def extract_paper(pdf_path):
"""从研究论文中提取结构化数据。"""
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
infer_table_structure=True,
include_page_breaks=True
)
paper = {
'title': None,
'abstract': None,
'sections': [],
'tables': [],
'references': []
}
# 查找标题(通常是第一个 Title 元素)
for element in elements:
if element.category == "Title" and not paper['title']:
paper['title'] = element.text
break
# 提取表格
for element in elements:
if element.category == "Table":
paper['tables'].append({
'page': element.metadata.page_number,
'content': element.text,
'html': element.metadata.text_as_html if hasattr(element.metadata, 'text_as_html') else None
})
# 分块为章节
chunks = chunk_by_title(elements, max_characters=2000)
current_section = None
for chunk in chunks:
if chunk.category == "Title":
paper['sections'].append({
'title': chunk.text,
'content': ''
})
elif paper['sections']:
paper['sections'][-1]['content'] += chunk.text + '\n'
return paper
paper = extract_paper('research_paper.pdf')
print(f"标题: {paper['title']}")
print(f"表格: {len(paper['tables'])}")
print(f"章节: {len(paper['sections'])}")
from unstructured.partition.auto import partition
import re
def extract_invoice_data(file_path):
"""从发票中提取关键数据。"""
elements = partition(file_path, strategy="hi_res")
# 合并所有文本
full_text = '\n'.join([e.text for e in elements])
invoice = {
'invoice_number': None,
'date': None,
'total': None,
'vendor': None,
'line_items': [],
'tables': []
}
# 提取模式
inv_match = re.search(r'Invoice\s*#?\s*:?\s*(\w+[-\w]*)', full_text, re.I)
if inv_match:
invoice['invoice_number'] = inv_match.group(1)
date_match = re.search(r'Date\s*:?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', full_text, re.I)
if date_match:
invoice['date'] = date_match.group(1)
total_match = re.search(r'Total\s*:?\s*\$?([\d,]+\.?\d*)', full_text, re.I)
if total_match:
invoice['total'] = float(total_match.group(1).replace(',', ''))
# 提取表格
for element in elements:
if element.category == "Table":
invoice['tables'].append(element.text)
return invoice
invoice = extract_invoice_data('invoice.pdf')
print(f"发票号: {invoice['invoice_number']}")
print(f"总计: ${invoice['total']}")
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
from pathlib import Path
import json
def build_corpus(input_dir, output_path):
"""从文档集合构建可搜索语料库。"""
input_path = Path(input_dir)
corpus = []
# 支持多种格式
patterns = ['*.pdf', '*.docx', '*.html', '*.txt', '*.md']
files = []
for pattern in patterns:
files.extend(input_path.glob(pattern))
for file in files:
print(f"处理中: {file.name}")
try:
elements = partition(str(file))
chunks = chunk_by_title(elements, max_characters=1000)
for i, chunk in enumerate(chunks):
corpus.append({
'id': f"{file.stem}_{i}",
'source': str(file),
'type': type(chunk).__name__,
'text': chunk.text,
'page': chunk.metadata.page_number if chunk.metadata.page_number else None
})
except Exception as e:
print(f" 错误: {e}")
# 保存语料库
with open(output_path, 'w') as f:
json.dump(corpus, f, indent=2)
print(f"语料库构建完成: 从 {len(files)} 个文件中提取了 {len(corpus)} 个分块")
return corpus
corpus = build_corpus('./documents', 'corpus.json')
# 基本安装
pip install unstructured
# 包含所有依赖项
pip install "unstructured[all-docs]"
# 用于 PDF 处理
pip install "unstructured[pdf]"
# 用于特定格式
pip install "unstructured[docx,pptx,xlsx]"
每周安装数
32
仓库
GitHub 星标数
5
首次出现
5 天前
安全审计
安装于
claude-code27
opencode12
gemini-cli12
github-copilot12
amp12
cline12
This skill enables extraction of structured data from any document format using unstructured - a unified library for processing PDFs, Word docs, emails, HTML, and more. Get consistent, structured output regardless of input format.
Example prompts:
from unstructured.partition.auto import partition
# Automatically detect and process any document
elements = partition("document.pdf")
# Access extracted elements
for element in elements:
print(f"Type: {type(element).__name__}")
print(f"Text: {element.text}")
print(f"Metadata: {element.metadata}")
| Format | Function | Notes |
|---|---|---|
partition_pdf | Native + scanned | |
| Word | partition_docx | Full structure |
| PowerPoint | partition_pptx | Slides & notes |
| Excel | partition_xlsx | Sheets & tables |
partition_email | Body & attachments | |
| HTML | partition_html |
from unstructured.documents.elements import (
Title,
NarrativeText,
Text,
ListItem,
Table,
Image,
Header,
Footer,
PageBreak,
Address,
EmailAddress,
)
# Elements have consistent structure
element.text # Raw text content
element.metadata # Rich metadata
element.category # Element type
element.id # Unique identifier
from unstructured.partition.auto import partition
# Process any file type
elements = partition(
filename="document.pdf",
strategy="auto", # or "fast", "hi_res", "ocr_only"
include_metadata=True,
include_page_breaks=True,
)
# Filter by type
titles = [e for e in elements if isinstance(e, Title)]
tables = [e for e in elements if isinstance(e, Table)]
# PDF with options
from unstructured.partition.pdf import partition_pdf
elements = partition_pdf(
filename="document.pdf",
strategy="hi_res", # High quality extraction
infer_table_structure=True, # Detect tables
include_page_breaks=True,
languages=["en"], # OCR language
)
# Word documents
from unstructured.partition.docx import partition_docx
elements = partition_docx(
filename="document.docx",
include_metadata=True,
)
# HTML
from unstructured.partition.html import partition_html
elements = partition_html(
filename="page.html",
include_metadata=True,
)
from unstructured.partition.auto import partition
elements = partition("report.pdf", infer_table_structure=True)
# Extract tables
for element in elements:
if element.category == "Table":
print("Table found:")
print(element.text)
# Access structured table data
if hasattr(element, 'metadata') and element.metadata.text_as_html:
print("HTML:", element.metadata.text_as_html)
from unstructured.partition.auto import partition
elements = partition("document.pdf")
for element in elements:
meta = element.metadata
# Common metadata fields
print(f"Page: {meta.page_number}")
print(f"Filename: {meta.filename}")
print(f"Filetype: {meta.filetype}")
print(f"Coordinates: {meta.coordinates}")
print(f"Languages: {meta.languages}")
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
from unstructured.chunking.basic import chunk_elements
# Partition document
elements = partition("document.pdf")
# Chunk by title (semantic chunks)
chunks = chunk_by_title(
elements,
max_characters=1000,
combine_text_under_n_chars=200,
)
# Or basic chunking
chunks = chunk_elements(
elements,
max_characters=500,
overlap=50,
)
for chunk in chunks:
print(f"Chunk ({len(chunk.text)} chars):")
print(chunk.text[:100] + "...")
from unstructured.partition.auto import partition
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def process_document(file_path):
"""Process single document."""
try:
elements = partition(str(file_path))
return {
'file': str(file_path),
'status': 'success',
'elements': len(elements),
'text': '\n\n'.join([e.text for e in elements])
}
except Exception as e:
return {
'file': str(file_path),
'status': 'error',
'error': str(e)
}
def batch_process(input_dir, max_workers=4):
"""Process all documents in directory."""
input_path = Path(input_dir)
files = list(input_path.glob('*'))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_document, files))
return results
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json, elements_to_dicts
elements = partition("document.pdf")
# To JSON string
json_str = elements_to_json(elements)
# To list of dicts
dicts = elements_to_dicts(elements)
# To DataFrame
import pandas as pd
df = pd.DataFrame(dicts)
def document_to_json(file_path, output_path=None):
"""Convert document to structured JSON."""
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
import json
elements = partition(file_path)
# Create structured output
output = {
'source': file_path,
'elements': []
}
for element in elements:
output['elements'].append({
'type': type(element).__name__,
'text': element.text,
'metadata': {
'page': element.metadata.page_number,
'coordinates': element.metadata.coordinates.to_dict() if element.metadata.coordinates else None
}
})
if output_path:
with open(output_path, 'w') as f:
json.dump(output, f, indent=2)
return output
from unstructured.partition.email import partition_email
def parse_email(email_path):
"""Extract structured data from email."""
elements = partition_email(email_path)
email_data = {
'subject': None,
'from': None,
'to': [],
'date': None,
'body': [],
'attachments': []
}
for element in elements:
meta = element.metadata
# Extract headers from metadata
if meta.subject:
email_data['subject'] = meta.subject
if meta.sent_from:
email_data['from'] = meta.sent_from
if meta.sent_to:
email_data['to'] = meta.sent_to
# Body content
email_data['body'].append({
'type': type(element).__name__,
'text': element.text
})
return email_data
from unstructured.partition.pdf import partition_pdf
from unstructured.chunking.title import chunk_by_title
def extract_paper(pdf_path):
"""Extract structured data from research paper."""
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
infer_table_structure=True,
include_page_breaks=True
)
paper = {
'title': None,
'abstract': None,
'sections': [],
'tables': [],
'references': []
}
# Find title (usually first Title element)
for element in elements:
if element.category == "Title" and not paper['title']:
paper['title'] = element.text
break
# Extract tables
for element in elements:
if element.category == "Table":
paper['tables'].append({
'page': element.metadata.page_number,
'content': element.text,
'html': element.metadata.text_as_html if hasattr(element.metadata, 'text_as_html') else None
})
# Chunk into sections
chunks = chunk_by_title(elements, max_characters=2000)
current_section = None
for chunk in chunks:
if chunk.category == "Title":
paper['sections'].append({
'title': chunk.text,
'content': ''
})
elif paper['sections']:
paper['sections'][-1]['content'] += chunk.text + '\n'
return paper
paper = extract_paper('research_paper.pdf')
print(f"Title: {paper['title']}")
print(f"Tables: {len(paper['tables'])}")
print(f"Sections: {len(paper['sections'])}")
from unstructured.partition.auto import partition
import re
def extract_invoice_data(file_path):
"""Extract key data from invoice."""
elements = partition(file_path, strategy="hi_res")
# Combine all text
full_text = '\n'.join([e.text for e in elements])
invoice = {
'invoice_number': None,
'date': None,
'total': None,
'vendor': None,
'line_items': [],
'tables': []
}
# Extract patterns
inv_match = re.search(r'Invoice\s*#?\s*:?\s*(\w+[-\w]*)', full_text, re.I)
if inv_match:
invoice['invoice_number'] = inv_match.group(1)
date_match = re.search(r'Date\s*:?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', full_text, re.I)
if date_match:
invoice['date'] = date_match.group(1)
total_match = re.search(r'Total\s*:?\s*\$?([\d,]+\.?\d*)', full_text, re.I)
if total_match:
invoice['total'] = float(total_match.group(1).replace(',', ''))
# Extract tables
for element in elements:
if element.category == "Table":
invoice['tables'].append(element.text)
return invoice
invoice = extract_invoice_data('invoice.pdf')
print(f"Invoice #: {invoice['invoice_number']}")
print(f"Total: ${invoice['total']}")
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
from pathlib import Path
import json
def build_corpus(input_dir, output_path):
"""Build searchable corpus from document collection."""
input_path = Path(input_dir)
corpus = []
# Support multiple formats
patterns = ['*.pdf', '*.docx', '*.html', '*.txt', '*.md']
files = []
for pattern in patterns:
files.extend(input_path.glob(pattern))
for file in files:
print(f"Processing: {file.name}")
try:
elements = partition(str(file))
chunks = chunk_by_title(elements, max_characters=1000)
for i, chunk in enumerate(chunks):
corpus.append({
'id': f"{file.stem}_{i}",
'source': str(file),
'type': type(chunk).__name__,
'text': chunk.text,
'page': chunk.metadata.page_number if chunk.metadata.page_number else None
})
except Exception as e:
print(f" Error: {e}")
# Save corpus
with open(output_path, 'w') as f:
json.dump(corpus, f, indent=2)
print(f"Corpus built: {len(corpus)} chunks from {len(files)} files")
return corpus
corpus = build_corpus('./documents', 'corpus.json')
# Basic installation
pip install unstructured
# With all dependencies
pip install "unstructured[all-docs]"
# For PDF processing
pip install "unstructured[pdf]"
# For specific formats
pip install "unstructured[docx,pptx,xlsx]"
Weekly Installs
32
Repository
GitHub Stars
5
First Seen
5 days ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code27
opencode12
gemini-cli12
github-copilot12
amp12
cline12
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
20,700 周安装
| Tags preserved |
| Markdown | partition_md | Structure preserved |
| Plain Text | partition_text | Basic parsing |
| Images | partition_image | OCR extraction |