重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
digital-archive by jamditis/claude-skills-journalism
npx skills add https://github.com/jamditis/claude-skills-journalism --skill digital-archive利用人工智能分析和知识图谱构建生产级数字档案的模式。
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ OCR Pipeline │ │ Web Scraping │ │ Social Media │
│ (newspapers) │ │ (articles) │ │ (transcripts) │
└────────┬────────┘ └────────┬─────────┘ └───────┬────────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ Unified Schema │
│ (35+ fields) │
└───────────┬───────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌────────▼────────┐ ┌──────────▼──────────┐ ┌───────▼───────┐
│ AI Enrichment │ │ Entity Extraction │ │ PDF Archive │
│ (Gemini) │ │ (Knowledge Graph) │ │ (WCAG 2.1) │
└────────┬────────┘ └──────────┬──────────┘ └───────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ Google Sheets │
│ (primary database) │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Frontend Export │
│ (JSON/CSV) │
└───────────────────────┘
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from dataclasses import dataclass, field
from datetime import date
from typing import Optional
from enum import Enum
class ContentType(Enum):
ARTICLE = 'Article'
VIDEO = 'Video'
AUDIO = 'Audio'
SOCIAL = 'Social Post'
NEWSPAPER = 'Newspaper Article'
class ThematicCategory(Enum):
PRESS_CRITICISM = 'Press & Media Criticism'
JOURNALISM_THEORY = 'Journalism Theory'
POLITICS = 'Politics & Democracy'
TECHNOLOGY = 'Technology & Digital Media'
EDUCATION = 'Journalism Education'
AUDIENCE = 'Audience & Public Engagement'
class HistoricalEra(Enum):
ERA_1990s = '1990-1999'
ERA_2000_04 = '2000-2004'
ERA_2005_09 = '2005-2009'
ERA_2010_15 = '2010-2015'
ERA_2016_20 = '2016-2020'
ERA_2021_PRESENT = '2021-present'
@dataclass
class ArchiveRecord:
# Core identifiers
id: str # Format: SOURCE-00001
url: str
title: str
# Content
author: Optional[str] = None
publication_date: Optional[date] = None
publication: Optional[str] = None
content_type: ContentType = ContentType.ARTICLE
text: str = ''
# AI-enriched fields
summary: Optional[str] = None
pull_quote: Optional[str] = None
categories: list[ThematicCategory] = field(default_factory=list)
key_concepts: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
era: Optional[HistoricalEra] = None
scope: Optional[str] = None # Theoretical, Commentary, Case Study, etc.
# Entity references
entities_mentioned: list[str] = field(default_factory=list)
related_to: list[str] = field(default_factory=list)
responds_to: list[str] = field(default_factory=list)
# Archive metadata
pdf_url: Optional[str] = None
transcript_url: Optional[str] = None
verified: bool = False
processing_status: str = 'pending'
last_updated: Optional[date] = None
def generate_record_id(source: str, sequence: int) -> str:
"""Generate unique ID with source prefix."""
prefixes = {
'nytimes': 'NYT',
'columbia journalism review': 'CJR',
'pressthink': 'PT',
'twitter': 'TW',
'youtube': 'YT',
'newspaper': 'NEWS',
}
prefix = prefixes.get(source.lower(), 'MISC')
return f"{prefix}-{sequence:05d}"
import google.generativeai as genai
import json
from typing import Optional
TAXONOMY = {
"thematic_categories": [
"Press & Media Criticism",
"Journalism Theory",
"Politics & Democracy",
"Technology & Digital Media",
"Journalism Education",
"Audience & Public Engagement"
],
"key_concepts": [
"The View from Nowhere",
"Verification vs. Assertion",
"Citizens vs. Consumers",
"Public Journalism",
"The Rosen Test",
"Savvy vs. Naive",
"Professional vs. Amateur",
"Production vs. Distribution",
"Trust vs. Transparency",
"Horse Race Coverage",
"Both Sides Journalism",
"Audience Atomization",
"The Church of the Savvy"
],
"scope_types": [
"Theoretical",
"Commentary",
"Historical",
"Case Study",
"Pedagogical",
"Personal Reflection"
]
}
class ArchiveCategorizer:
def __init__(self, model: str = 'gemini-2.0-flash'):
self.model = genai.GenerativeModel(model)
def categorize(self, record: ArchiveRecord) -> dict:
prompt = f"""Analyze this archival content and categorize it according to the taxonomy.
CONTENT:
Title: {record.title}
Author: {record.author or 'Unknown'}
Date: {record.publication_date or 'Unknown'}
Text (first 8000 chars):
{record.text[:8000]}
TAXONOMY:
{json.dumps(TAXONOMY, indent=2)}
Respond with JSON containing:
{{
"categories": ["category1", "category2"], // 1-3 from thematic_categories
"key_concepts": ["concept1", "concept2"], // 0-5 from key_concepts list
"scope": "scope_type", // one from scope_types
"era": "YYYY-YYYY", // decade range
"tags": ["tag1", "tag2", "tag3", "tag4", "tag5"], // 5 contextual keywords
"summary": "2-3 sentence summary",
"pull_quote": "Most impactful quote from the text"
}}
IMPORTANT:
- Only use categories/concepts from the taxonomy
- Tags should be lowercase, hyphenated keywords
- Summary should capture the main argument
- Pull quote must be an exact excerpt from the text
"""
response = self.model.generate_content(prompt)
result = self._parse_response(response.text)
# Validate against taxonomy
result['categories'] = [c for c in result.get('categories', [])
if c in TAXONOMY['thematic_categories']]
result['key_concepts'] = [c for c in result.get('key_concepts', [])
if c in TAXONOMY['key_concepts']]
return result
def _parse_response(self, text: str) -> dict:
"""Extract JSON from response, handling markdown code blocks."""
# Remove markdown code blocks if present
if '```json' in text:
text = text.split('```json')[1].split('```')[0]
elif '```' in text:
text = text.split('```')[1].split('```')[0]
return json.loads(text.strip())
def validate_response(self, result: dict, text: str) -> bool:
"""Detect AI hallucination patterns."""
# Check for uniform response signature (all same values)
if len(set(result.get('tags', []))) < 3:
return False
# Check pull quote exists in text
pull_quote = result.get('pull_quote', '')
if pull_quote and pull_quote.lower() not in text.lower():
return False
# Check summary isn't generic
generic_phrases = ['this article discusses', 'the author explores', 'this piece examines']
summary = result.get('summary', '').lower()
if any(phrase in summary for phrase in generic_phrases):
return False
return True
from dataclasses import dataclass
from typing import Literal
EntityType = Literal['Person', 'Organization', 'Work', 'Concept', 'Event', 'Location']
RelationshipType = Literal[
'Mentions', 'Criticizes', 'Cites', 'Discusses', 'Expands On', 'Supports',
'Founded By', 'Pioneered', 'Inspired By',
'Affiliated With', 'Published In', 'Originated By', 'Occurred At',
'Owns', 'Owned By'
]
@dataclass
class Entity:
id: str # P-001, O-001, W-001, etc.
name: str
type: EntityType
aliases: list[str] # Alternative names/spellings
prominence: float # 0-10 based on discussion depth
mention_count: int = 0
first_mentioned_in: str = '' # Record ID
@dataclass
class Relationship:
source_entity_id: str
target_entity_id: str
relationship_type: RelationshipType
source_record_id: str # Which record established this relationship
confidence: float = 1.0
class EntityRegistry:
"""Deduplication and normalization for entities."""
NORMALIZATIONS = {
'nyt': 'The New York Times',
'new york times': 'The New York Times',
'ny times': 'The New York Times',
'washington post': 'The Washington Post',
'wapo': 'The Washington Post',
'cnn': 'CNN',
'fox': 'Fox News',
'fox news channel': 'Fox News',
}
def __init__(self):
self.entities: dict[str, Entity] = {}
self.name_to_id: dict[str, str] = {}
def normalize_name(self, name: str) -> str:
"""Normalize entity name to canonical form."""
name_lower = name.lower().strip()
return self.NORMALIZATIONS.get(name_lower, name.strip())
def find_or_create(self, name: str, entity_type: EntityType) -> Entity:
"""Find existing entity or create new one."""
normalized = self.normalize_name(name)
# Check if already exists
if normalized.lower() in self.name_to_id:
entity_id = self.name_to_id[normalized.lower()]
entity = self.entities[entity_id]
entity.mention_count += 1
return entity
# Create new entity
type_prefix = entity_type[0].upper() # P, O, W, C, E, L
count = sum(1 for e in self.entities.values() if e.type == entity_type)
entity_id = f"{type_prefix}-{count + 1:04d}"
entity = Entity(
id=entity_id,
name=normalized,
type=entity_type,
aliases=[name] if name != normalized else [],
prominence=0.0,
mention_count=1
)
self.entities[entity_id] = entity
self.name_to_id[normalized.lower()] = entity_id
return entity
class EntityExtractor:
def __init__(self, registry: EntityRegistry):
self.registry = registry
self.model = genai.GenerativeModel('gemini-2.0-flash')
def extract(self, record: ArchiveRecord) -> tuple[list[Entity], list[Relationship]]:
prompt = f"""Extract named entities and relationships from this archival content.
CONTENT:
Title: {record.title}
Text: {record.text[:10000]}
ENTITY TYPES:
- Person: journalists, politicians, academics, media figures
- Organization: news outlets, media companies, academic institutions
- Work: articles, books, blog posts, studies, reports
- Concept: journalism theories, media criticism frameworks
- Event: conferences, elections, media crises
- Location: geographic locations relevant to media context
RELATIONSHIP TYPES:
- Mentions, Criticizes, Cites, Discusses, Expands On, Supports
- Founded By, Pioneered, Inspired By
- Affiliated With, Published In, Originated By, Occurred At
- Owns, Owned By
Respond with JSON:
{{
"entities": [
{{"name": "Entity Name", "type": "Person|Organization|...", "prominence": 1-10}}
],
"relationships": [
{{"source": "Entity Name", "target": "Entity Name", "type": "Relationship Type"}}
]
}}
IMPORTANT:
- Prominence: 1-3 = mentioned briefly, 4-6 = discussed, 7-10 = central focus
- Only extract entities actually discussed, not just mentioned in passing
- Relationships must connect entities that appear in the same text
"""
response = self.model.generate_content(prompt)
data = json.loads(response.text)
entities = []
entity_name_to_obj = {}
# Process entities
for e in data.get('entities', []):
entity = self.registry.find_or_create(e['name'], e['type'])
entity.prominence = max(entity.prominence, e.get('prominence', 5))
entities.append(entity)
entity_name_to_obj[e['name'].lower()] = entity
# Process relationships
relationships = []
for r in data.get('relationships', []):
source = entity_name_to_obj.get(r['source'].lower())
target = entity_name_to_obj.get(r['target'].lower())
if source and target:
relationships.append(Relationship(
source_entity_id=source.id,
target_entity_id=target.id,
relationship_type=r['type'],
source_record_id=record.id
))
return entities, relationships
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.units import inch
from pathlib import Path
class ArchivePDFGenerator:
"""Generate accessible PDFs for archival preservation."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.styles = getSampleStyleSheet()
# Custom styles
self.styles.add(ParagraphStyle(
'ArchiveTitle',
parent=self.styles['Heading1'],
fontSize=16,
spaceAfter=12
))
self.styles.add(ParagraphStyle(
'ArchiveMeta',
parent=self.styles['Normal'],
fontSize=10,
textColor='#666666',
spaceAfter=6
))
def generate(self, record: ArchiveRecord) -> Path:
output_path = self.output_dir / f"{record.id}.pdf"
doc = SimpleDocTemplate(
str(output_path),
pagesize=letter,
title=record.title,
author=record.author or 'Unknown',
subject=f"Archive record {record.id}"
)
story = []
# Title
story.append(Paragraph(record.title, self.styles['ArchiveTitle']))
# Metadata block
meta_lines = [
f"<b>Author:</b> {record.author or 'Unknown'}",
f"<b>Date:</b> {record.publication_date or 'Unknown'}",
f"<b>Source:</b> {record.publication or 'Unknown'}",
f"<b>URL:</b> {record.url}",
f"<b>Archive ID:</b> {record.id}",
]
for line in meta_lines:
story.append(Paragraph(line, self.styles['ArchiveMeta']))
story.append(Spacer(1, 0.25 * inch))
# Summary (if available)
if record.summary:
story.append(Paragraph("<b>Summary:</b>", self.styles['Heading2']))
story.append(Paragraph(record.summary, self.styles['Normal']))
story.append(Spacer(1, 0.25 * inch))
# Main content
story.append(Paragraph("<b>Full Text:</b>", self.styles['Heading2']))
# Split into paragraphs and add
paragraphs = record.text.split('\n\n')
for para in paragraphs:
if para.strip():
story.append(Paragraph(para.strip(), self.styles['Normal']))
story.append(Spacer(1, 0.1 * inch))
# Build PDF
doc.build(story)
return output_path
from dataclasses import dataclass
from typing import Callable
@dataclass
class ValidationResult:
field: str
valid: bool
message: str
severity: Literal['error', 'warning', 'info']
class ArchiveValidator:
"""Validate archive records for completeness and consistency."""
REQUIRED_FIELDS = ['id', 'url', 'title', 'text']
CRITICAL_FIELDS = ['publication_date', 'author', 'summary']
OPTIONAL_FIELDS = ['categories', 'tags', 'pull_quote']
def validate(self, record: ArchiveRecord) -> list[ValidationResult]:
results = []
# Required fields
for field in self.REQUIRED_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Required field '{field}' is missing",
severity='error'
))
# Critical fields (should have but not blocking)
for field in self.CRITICAL_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Critical field '{field}' is missing",
severity='warning'
))
# Content length check
if record.text and len(record.text) < 100:
results.append(ValidationResult(
field='text',
valid=False,
message=f"Text unusually short ({len(record.text)} chars)",
severity='warning'
))
# Date format validation
if record.publication_date:
try:
# Ensure date is valid
_ = record.publication_date.isoformat()
except (AttributeError, ValueError):
results.append(ValidationResult(
field='publication_date',
valid=False,
message="Invalid date format",
severity='error'
))
# Category validation
for cat in record.categories:
if cat not in ThematicCategory:
results.append(ValidationResult(
field='categories',
valid=False,
message=f"Unknown category: {cat}",
severity='warning'
))
return results
def is_complete(self, record: ArchiveRecord) -> bool:
"""Check if record has all critical fields populated."""
results = self.validate(record)
errors = [r for r in results if r.severity == 'error']
return len(errors) == 0
class ArchiveWorkflow:
"""Orchestrate the complete archive processing pipeline."""
def __init__(self, config: Config):
self.scraper = ScrapingCascade()
self.categorizer = ArchiveCategorizer()
self.entity_registry = EntityRegistry()
self.entity_extractor = EntityExtractor(self.entity_registry)
self.pdf_generator = ArchivePDFGenerator(config.PDF_DIR)
self.sheets_service = SheetsService(config.CREDENTIALS_PATH)
self.validator = ArchiveValidator()
self.progress = ProgressTracker(config.PROGRESS_FILE)
def process_url(self, url: str, record_id: str) -> ArchiveRecord:
"""Process a single URL through the complete pipeline."""
# 1. Scrape content
result = self.scraper.fetch(url)
if not result:
raise ValueError(f"Failed to scrape: {url}")
# 2. Create initial record
record = ArchiveRecord(
id=record_id,
url=url,
title=result.title,
text=result.content
)
# 3. AI categorization
categories = self.categorizer.categorize(record)
record.summary = categories.get('summary')
record.pull_quote = categories.get('pull_quote')
record.categories = categories.get('categories', [])
record.key_concepts = categories.get('key_concepts', [])
record.tags = categories.get('tags', [])
record.era = categories.get('era')
record.scope = categories.get('scope')
# 4. Entity extraction
entities, relationships = self.entity_extractor.extract(record)
record.entities_mentioned = [e.id for e in entities]
# 5. Generate PDF
pdf_path = self.pdf_generator.generate(record)
record.pdf_url = str(pdf_path)
# 6. Validate
validation = self.validator.validate(record)
record.verified = self.validator.is_complete(record)
record.processing_status = 'completed'
return record
def run_batch(self, input_csv: Path):
"""Process all URLs from input CSV."""
for row in read_input(input_csv):
if self.progress.is_processed(row.id):
continue
try:
record = self.process_url(row.url, row.id)
self.sheets_service.append_row(self.worksheet, record_to_row(record))
self.progress.mark_processed(row.id)
except Exception as e:
self.progress.log_error(row.id, str(e))
import json
from pathlib import Path
def export_for_frontend(records: list[ArchiveRecord], output_dir: Path):
"""Export archive data in frontend-friendly formats."""
# Main archive JSON
archive_data = {
'metadata': {
'total_records': len(records),
'last_updated': datetime.now().isoformat(),
'schema_version': '2.0'
},
'records': [asdict(r) for r in records]
}
(output_dir / 'archive-data.json').write_text(
json.dumps(archive_data, indent=2, default=str)
)
# Entity export
entities_data = [asdict(e) for e in entity_registry.entities.values()]
(output_dir / 'entities.json').write_text(
json.dumps(entities_data, indent=2)
)
# Relationships export
relationships_data = [asdict(r) for r in all_relationships]
(output_dir / 'relationships.json').write_text(
json.dumps(relationships_data, indent=2)
)
# CSV exports for spreadsheet compatibility
records_df = pd.DataFrame([asdict(r) for r in records])
records_df.to_csv(output_dir / 'archive_records.csv', index=False)
每周安装量
52
代码仓库
GitHub 星标数
84
首次出现
2026年1月21日
安全审计
安装于
codex47
gemini-cli47
opencode46
cursor43
github-copilot42
amp41
Patterns for building production-quality digital archives with AI-powered analysis and knowledge graph construction.
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ OCR Pipeline │ │ Web Scraping │ │ Social Media │
│ (newspapers) │ │ (articles) │ │ (transcripts) │
└────────┬────────┘ └────────┬─────────┘ └───────┬────────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ Unified Schema │
│ (35+ fields) │
└───────────┬───────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌────────▼────────┐ ┌──────────▼──────────┐ ┌───────▼───────┐
│ AI Enrichment │ │ Entity Extraction │ │ PDF Archive │
│ (Gemini) │ │ (Knowledge Graph) │ │ (WCAG 2.1) │
└────────┬────────┘ └──────────┬──────────┘ └───────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ Google Sheets │
│ (primary database) │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Frontend Export │
│ (JSON/CSV) │
└───────────────────────┘
from dataclasses import dataclass, field
from datetime import date
from typing import Optional
from enum import Enum
class ContentType(Enum):
ARTICLE = 'Article'
VIDEO = 'Video'
AUDIO = 'Audio'
SOCIAL = 'Social Post'
NEWSPAPER = 'Newspaper Article'
class ThematicCategory(Enum):
PRESS_CRITICISM = 'Press & Media Criticism'
JOURNALISM_THEORY = 'Journalism Theory'
POLITICS = 'Politics & Democracy'
TECHNOLOGY = 'Technology & Digital Media'
EDUCATION = 'Journalism Education'
AUDIENCE = 'Audience & Public Engagement'
class HistoricalEra(Enum):
ERA_1990s = '1990-1999'
ERA_2000_04 = '2000-2004'
ERA_2005_09 = '2005-2009'
ERA_2010_15 = '2010-2015'
ERA_2016_20 = '2016-2020'
ERA_2021_PRESENT = '2021-present'
@dataclass
class ArchiveRecord:
# Core identifiers
id: str # Format: SOURCE-00001
url: str
title: str
# Content
author: Optional[str] = None
publication_date: Optional[date] = None
publication: Optional[str] = None
content_type: ContentType = ContentType.ARTICLE
text: str = ''
# AI-enriched fields
summary: Optional[str] = None
pull_quote: Optional[str] = None
categories: list[ThematicCategory] = field(default_factory=list)
key_concepts: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
era: Optional[HistoricalEra] = None
scope: Optional[str] = None # Theoretical, Commentary, Case Study, etc.
# Entity references
entities_mentioned: list[str] = field(default_factory=list)
related_to: list[str] = field(default_factory=list)
responds_to: list[str] = field(default_factory=list)
# Archive metadata
pdf_url: Optional[str] = None
transcript_url: Optional[str] = None
verified: bool = False
processing_status: str = 'pending'
last_updated: Optional[date] = None
def generate_record_id(source: str, sequence: int) -> str:
"""Generate unique ID with source prefix."""
prefixes = {
'nytimes': 'NYT',
'columbia journalism review': 'CJR',
'pressthink': 'PT',
'twitter': 'TW',
'youtube': 'YT',
'newspaper': 'NEWS',
}
prefix = prefixes.get(source.lower(), 'MISC')
return f"{prefix}-{sequence:05d}"
import google.generativeai as genai
import json
from typing import Optional
TAXONOMY = {
"thematic_categories": [
"Press & Media Criticism",
"Journalism Theory",
"Politics & Democracy",
"Technology & Digital Media",
"Journalism Education",
"Audience & Public Engagement"
],
"key_concepts": [
"The View from Nowhere",
"Verification vs. Assertion",
"Citizens vs. Consumers",
"Public Journalism",
"The Rosen Test",
"Savvy vs. Naive",
"Professional vs. Amateur",
"Production vs. Distribution",
"Trust vs. Transparency",
"Horse Race Coverage",
"Both Sides Journalism",
"Audience Atomization",
"The Church of the Savvy"
],
"scope_types": [
"Theoretical",
"Commentary",
"Historical",
"Case Study",
"Pedagogical",
"Personal Reflection"
]
}
class ArchiveCategorizer:
def __init__(self, model: str = 'gemini-2.0-flash'):
self.model = genai.GenerativeModel(model)
def categorize(self, record: ArchiveRecord) -> dict:
prompt = f"""Analyze this archival content and categorize it according to the taxonomy.
CONTENT:
Title: {record.title}
Author: {record.author or 'Unknown'}
Date: {record.publication_date or 'Unknown'}
Text (first 8000 chars):
{record.text[:8000]}
TAXONOMY:
{json.dumps(TAXONOMY, indent=2)}
Respond with JSON containing:
{{
"categories": ["category1", "category2"], // 1-3 from thematic_categories
"key_concepts": ["concept1", "concept2"], // 0-5 from key_concepts list
"scope": "scope_type", // one from scope_types
"era": "YYYY-YYYY", // decade range
"tags": ["tag1", "tag2", "tag3", "tag4", "tag5"], // 5 contextual keywords
"summary": "2-3 sentence summary",
"pull_quote": "Most impactful quote from the text"
}}
IMPORTANT:
- Only use categories/concepts from the taxonomy
- Tags should be lowercase, hyphenated keywords
- Summary should capture the main argument
- Pull quote must be an exact excerpt from the text
"""
response = self.model.generate_content(prompt)
result = self._parse_response(response.text)
# Validate against taxonomy
result['categories'] = [c for c in result.get('categories', [])
if c in TAXONOMY['thematic_categories']]
result['key_concepts'] = [c for c in result.get('key_concepts', [])
if c in TAXONOMY['key_concepts']]
return result
def _parse_response(self, text: str) -> dict:
"""Extract JSON from response, handling markdown code blocks."""
# Remove markdown code blocks if present
if '```json' in text:
text = text.split('```json')[1].split('```')[0]
elif '```' in text:
text = text.split('```')[1].split('```')[0]
return json.loads(text.strip())
def validate_response(self, result: dict, text: str) -> bool:
"""Detect AI hallucination patterns."""
# Check for uniform response signature (all same values)
if len(set(result.get('tags', []))) < 3:
return False
# Check pull quote exists in text
pull_quote = result.get('pull_quote', '')
if pull_quote and pull_quote.lower() not in text.lower():
return False
# Check summary isn't generic
generic_phrases = ['this article discusses', 'the author explores', 'this piece examines']
summary = result.get('summary', '').lower()
if any(phrase in summary for phrase in generic_phrases):
return False
return True
from dataclasses import dataclass
from typing import Literal
EntityType = Literal['Person', 'Organization', 'Work', 'Concept', 'Event', 'Location']
RelationshipType = Literal[
'Mentions', 'Criticizes', 'Cites', 'Discusses', 'Expands On', 'Supports',
'Founded By', 'Pioneered', 'Inspired By',
'Affiliated With', 'Published In', 'Originated By', 'Occurred At',
'Owns', 'Owned By'
]
@dataclass
class Entity:
id: str # P-001, O-001, W-001, etc.
name: str
type: EntityType
aliases: list[str] # Alternative names/spellings
prominence: float # 0-10 based on discussion depth
mention_count: int = 0
first_mentioned_in: str = '' # Record ID
@dataclass
class Relationship:
source_entity_id: str
target_entity_id: str
relationship_type: RelationshipType
source_record_id: str # Which record established this relationship
confidence: float = 1.0
class EntityRegistry:
"""Deduplication and normalization for entities."""
NORMALIZATIONS = {
'nyt': 'The New York Times',
'new york times': 'The New York Times',
'ny times': 'The New York Times',
'washington post': 'The Washington Post',
'wapo': 'The Washington Post',
'cnn': 'CNN',
'fox': 'Fox News',
'fox news channel': 'Fox News',
}
def __init__(self):
self.entities: dict[str, Entity] = {}
self.name_to_id: dict[str, str] = {}
def normalize_name(self, name: str) -> str:
"""Normalize entity name to canonical form."""
name_lower = name.lower().strip()
return self.NORMALIZATIONS.get(name_lower, name.strip())
def find_or_create(self, name: str, entity_type: EntityType) -> Entity:
"""Find existing entity or create new one."""
normalized = self.normalize_name(name)
# Check if already exists
if normalized.lower() in self.name_to_id:
entity_id = self.name_to_id[normalized.lower()]
entity = self.entities[entity_id]
entity.mention_count += 1
return entity
# Create new entity
type_prefix = entity_type[0].upper() # P, O, W, C, E, L
count = sum(1 for e in self.entities.values() if e.type == entity_type)
entity_id = f"{type_prefix}-{count + 1:04d}"
entity = Entity(
id=entity_id,
name=normalized,
type=entity_type,
aliases=[name] if name != normalized else [],
prominence=0.0,
mention_count=1
)
self.entities[entity_id] = entity
self.name_to_id[normalized.lower()] = entity_id
return entity
class EntityExtractor:
def __init__(self, registry: EntityRegistry):
self.registry = registry
self.model = genai.GenerativeModel('gemini-2.0-flash')
def extract(self, record: ArchiveRecord) -> tuple[list[Entity], list[Relationship]]:
prompt = f"""Extract named entities and relationships from this archival content.
CONTENT:
Title: {record.title}
Text: {record.text[:10000]}
ENTITY TYPES:
- Person: journalists, politicians, academics, media figures
- Organization: news outlets, media companies, academic institutions
- Work: articles, books, blog posts, studies, reports
- Concept: journalism theories, media criticism frameworks
- Event: conferences, elections, media crises
- Location: geographic locations relevant to media context
RELATIONSHIP TYPES:
- Mentions, Criticizes, Cites, Discusses, Expands On, Supports
- Founded By, Pioneered, Inspired By
- Affiliated With, Published In, Originated By, Occurred At
- Owns, Owned By
Respond with JSON:
{{
"entities": [
{{"name": "Entity Name", "type": "Person|Organization|...", "prominence": 1-10}}
],
"relationships": [
{{"source": "Entity Name", "target": "Entity Name", "type": "Relationship Type"}}
]
}}
IMPORTANT:
- Prominence: 1-3 = mentioned briefly, 4-6 = discussed, 7-10 = central focus
- Only extract entities actually discussed, not just mentioned in passing
- Relationships must connect entities that appear in the same text
"""
response = self.model.generate_content(prompt)
data = json.loads(response.text)
entities = []
entity_name_to_obj = {}
# Process entities
for e in data.get('entities', []):
entity = self.registry.find_or_create(e['name'], e['type'])
entity.prominence = max(entity.prominence, e.get('prominence', 5))
entities.append(entity)
entity_name_to_obj[e['name'].lower()] = entity
# Process relationships
relationships = []
for r in data.get('relationships', []):
source = entity_name_to_obj.get(r['source'].lower())
target = entity_name_to_obj.get(r['target'].lower())
if source and target:
relationships.append(Relationship(
source_entity_id=source.id,
target_entity_id=target.id,
relationship_type=r['type'],
source_record_id=record.id
))
return entities, relationships
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.units import inch
from pathlib import Path
class ArchivePDFGenerator:
"""Generate accessible PDFs for archival preservation."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.styles = getSampleStyleSheet()
# Custom styles
self.styles.add(ParagraphStyle(
'ArchiveTitle',
parent=self.styles['Heading1'],
fontSize=16,
spaceAfter=12
))
self.styles.add(ParagraphStyle(
'ArchiveMeta',
parent=self.styles['Normal'],
fontSize=10,
textColor='#666666',
spaceAfter=6
))
def generate(self, record: ArchiveRecord) -> Path:
output_path = self.output_dir / f"{record.id}.pdf"
doc = SimpleDocTemplate(
str(output_path),
pagesize=letter,
title=record.title,
author=record.author or 'Unknown',
subject=f"Archive record {record.id}"
)
story = []
# Title
story.append(Paragraph(record.title, self.styles['ArchiveTitle']))
# Metadata block
meta_lines = [
f"<b>Author:</b> {record.author or 'Unknown'}",
f"<b>Date:</b> {record.publication_date or 'Unknown'}",
f"<b>Source:</b> {record.publication or 'Unknown'}",
f"<b>URL:</b> {record.url}",
f"<b>Archive ID:</b> {record.id}",
]
for line in meta_lines:
story.append(Paragraph(line, self.styles['ArchiveMeta']))
story.append(Spacer(1, 0.25 * inch))
# Summary (if available)
if record.summary:
story.append(Paragraph("<b>Summary:</b>", self.styles['Heading2']))
story.append(Paragraph(record.summary, self.styles['Normal']))
story.append(Spacer(1, 0.25 * inch))
# Main content
story.append(Paragraph("<b>Full Text:</b>", self.styles['Heading2']))
# Split into paragraphs and add
paragraphs = record.text.split('\n\n')
for para in paragraphs:
if para.strip():
story.append(Paragraph(para.strip(), self.styles['Normal']))
story.append(Spacer(1, 0.1 * inch))
# Build PDF
doc.build(story)
return output_path
from dataclasses import dataclass
from typing import Callable
@dataclass
class ValidationResult:
field: str
valid: bool
message: str
severity: Literal['error', 'warning', 'info']
class ArchiveValidator:
"""Validate archive records for completeness and consistency."""
REQUIRED_FIELDS = ['id', 'url', 'title', 'text']
CRITICAL_FIELDS = ['publication_date', 'author', 'summary']
OPTIONAL_FIELDS = ['categories', 'tags', 'pull_quote']
def validate(self, record: ArchiveRecord) -> list[ValidationResult]:
results = []
# Required fields
for field in self.REQUIRED_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Required field '{field}' is missing",
severity='error'
))
# Critical fields (should have but not blocking)
for field in self.CRITICAL_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Critical field '{field}' is missing",
severity='warning'
))
# Content length check
if record.text and len(record.text) < 100:
results.append(ValidationResult(
field='text',
valid=False,
message=f"Text unusually short ({len(record.text)} chars)",
severity='warning'
))
# Date format validation
if record.publication_date:
try:
# Ensure date is valid
_ = record.publication_date.isoformat()
except (AttributeError, ValueError):
results.append(ValidationResult(
field='publication_date',
valid=False,
message="Invalid date format",
severity='error'
))
# Category validation
for cat in record.categories:
if cat not in ThematicCategory:
results.append(ValidationResult(
field='categories',
valid=False,
message=f"Unknown category: {cat}",
severity='warning'
))
return results
def is_complete(self, record: ArchiveRecord) -> bool:
"""Check if record has all critical fields populated."""
results = self.validate(record)
errors = [r for r in results if r.severity == 'error']
return len(errors) == 0
class ArchiveWorkflow:
"""Orchestrate the complete archive processing pipeline."""
def __init__(self, config: Config):
self.scraper = ScrapingCascade()
self.categorizer = ArchiveCategorizer()
self.entity_registry = EntityRegistry()
self.entity_extractor = EntityExtractor(self.entity_registry)
self.pdf_generator = ArchivePDFGenerator(config.PDF_DIR)
self.sheets_service = SheetsService(config.CREDENTIALS_PATH)
self.validator = ArchiveValidator()
self.progress = ProgressTracker(config.PROGRESS_FILE)
def process_url(self, url: str, record_id: str) -> ArchiveRecord:
"""Process a single URL through the complete pipeline."""
# 1. Scrape content
result = self.scraper.fetch(url)
if not result:
raise ValueError(f"Failed to scrape: {url}")
# 2. Create initial record
record = ArchiveRecord(
id=record_id,
url=url,
title=result.title,
text=result.content
)
# 3. AI categorization
categories = self.categorizer.categorize(record)
record.summary = categories.get('summary')
record.pull_quote = categories.get('pull_quote')
record.categories = categories.get('categories', [])
record.key_concepts = categories.get('key_concepts', [])
record.tags = categories.get('tags', [])
record.era = categories.get('era')
record.scope = categories.get('scope')
# 4. Entity extraction
entities, relationships = self.entity_extractor.extract(record)
record.entities_mentioned = [e.id for e in entities]
# 5. Generate PDF
pdf_path = self.pdf_generator.generate(record)
record.pdf_url = str(pdf_path)
# 6. Validate
validation = self.validator.validate(record)
record.verified = self.validator.is_complete(record)
record.processing_status = 'completed'
return record
def run_batch(self, input_csv: Path):
"""Process all URLs from input CSV."""
for row in read_input(input_csv):
if self.progress.is_processed(row.id):
continue
try:
record = self.process_url(row.url, row.id)
self.sheets_service.append_row(self.worksheet, record_to_row(record))
self.progress.mark_processed(row.id)
except Exception as e:
self.progress.log_error(row.id, str(e))
import json
from pathlib import Path
def export_for_frontend(records: list[ArchiveRecord], output_dir: Path):
"""Export archive data in frontend-friendly formats."""
# Main archive JSON
archive_data = {
'metadata': {
'total_records': len(records),
'last_updated': datetime.now().isoformat(),
'schema_version': '2.0'
},
'records': [asdict(r) for r in records]
}
(output_dir / 'archive-data.json').write_text(
json.dumps(archive_data, indent=2, default=str)
)
# Entity export
entities_data = [asdict(e) for e in entity_registry.entities.values()]
(output_dir / 'entities.json').write_text(
json.dumps(entities_data, indent=2)
)
# Relationships export
relationships_data = [asdict(r) for r in all_relationships]
(output_dir / 'relationships.json').write_text(
json.dumps(relationships_data, indent=2)
)
# CSV exports for spreadsheet compatibility
records_df = pd.DataFrame([asdict(r) for r in records])
records_df.to_csv(output_dir / 'archive_records.csv', index=False)
Weekly Installs
52
Repository
GitHub Stars
84
First Seen
Jan 21, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex47
gemini-cli47
opencode46
cursor43
github-copilot42
amp41
AI界面设计评审工具 - 全面评估UI/UX设计质量、检测AI生成痕迹与优化用户体验
58,500 周安装
Oracle到PostgreSQL迁移测试项目脚手架 - 集成测试基础设施搭建指南
1,100 周安装
InkOS 多智能体小说创作 CLI 工具:AI 自主写作、审核与修订完整流程
1,000 周安装
Railway 文档助手 - 获取最新 Railway 平台部署、项目、定价等技术文档
1,000 周安装
Railway部署管理指南:列出、查看日志、重新部署与移除部署操作详解
1,000 周安装
Next.js 16.1.1 生产模式指南:App Router、缓存、Turbopack 与安全更新
1,000 周安装
React Sentry 安装配置指南:错误监控、日志记录与性能追踪
1,000 周安装