page-monitoring by jamditis/claude-skills-journalism
npx skills add https://github.com/jamditis/claude-skills-journalism --skill page-monitoring追踪网页变化、检测内容移除、在重要页面消失前进行保存的模式。
| 服务 | 免费套餐 | 最佳适用场景 | 存储时间 | 告警速度 |
|---|---|---|---|---|
| Visualping | 5 个页面 | 视觉变化 | 标准 | 分钟级 |
| ChangeTower | 有 | 合规性、归档 | 12 年 | 分钟级 |
| Distill.io | 25 个页面 | 元素级追踪 | 12 个月 | 秒级 |
| Wachete | 有限 | 登录保护页面 | 12 个月 | 分钟级 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| UptimeRobot |
| 50 个监控器 |
| 仅可用性 |
| 2 个月 |
| 分钟级 |
// Distill.io 允许使用 CSS/XPath 选择器进行精确监控
// 常见用例的选择器示例:
// 监控新闻文章标题
const newsSelector = '.article-headline, h1.title, .story-title';
// 监控价格变化
const priceSelector = '.price, .product-price, [data-price]';
// 监控库存/可用性
const availabilitySelector = '.in-stock, .availability, .stock-status';
// 监控特定段落或部分
const sectionSelector = '#main-content p:first-child';
// 监控表格数据
const tableSelector = 'table.data-table tbody tr';
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
class PageMonitor:
"""简单的页面变更监控器,带本地存储。"""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.state_file = storage_dir / 'monitor_state.json'
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {'pages': {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
"""获取页面或元素的内容哈希和内容。"""
response = requests.get(url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
})
response.raise_for_status()
if selector:
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
content = element.get_text(strip=True) if element else ''
else:
content = response.text
content_hash = hashlib.sha256(content.encode()).hexdigest()
return content_hash, content
def add_page(self, url: str, name: str, selector: str = None):
"""添加要监控的页面。"""
content_hash, content = self._get_page_hash(url, selector)
self.state['pages'][url] = {
'name': name,
'selector': selector,
'last_hash': content_hash,
'last_check': datetime.now().isoformat(),
'last_content': content[:1000], # 存储预览
'change_count': 0
}
self._save_state()
print(f"已添加:{name} ({url})")
def check_page(self, url: str) -> Optional[dict]:
"""检查单个页面是否有变化。"""
if url not in self.state['pages']:
return None
page = self.state['pages'][url]
selector = page.get('selector')
try:
new_hash, new_content = self._get_page_hash(url, selector)
except Exception as e:
return {
'url': url,
'name': page['name'],
'status': 'error',
'error': str(e)
}
changed = new_hash != page['last_hash']
result = {
'url': url,
'name': page['name'],
'status': 'changed' if changed else 'unchanged',
'previous_content': page['last_content'],
'new_content': new_content[:1000] if changed else None
}
if changed:
page['last_hash'] = new_hash
page['last_content'] = new_content[:1000]
page['change_count'] += 1
# 归档变更
archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
archive_file.write_text(new_content)
page['last_check'] = datetime.now().isoformat()
self._save_state()
return result
def check_all(self) -> list[dict]:
"""检查所有监控的页面。"""
results = []
for url in self.state['pages']:
result = self.check_page(url)
if result:
results.append(result)
return results
# 使用示例
monitor = PageMonitor(Path('./page_monitor_data'))
# 添加要监控的页面
monitor.add_page(
'https://example.com/important-page',
'重要页面',
selector='.main-content' # 可选:监控特定元素
)
# 检查变化
results = monitor.check_all()
for result in results:
if result['status'] == 'changed':
print(f"已变更:{result['name']}")
print(f" 之前:{result['previous_content'][:100]}...")
print(f" 现在:{result['new_content'][:100]}...")
import requests
from typing import List, Optional
class UptimeRobotClient:
"""用于监控页面可用性的 UptimeRobot API 客户端。"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.uptimerobot.com/v2"
def _request(self, endpoint: str, params: dict = None) -> dict:
data = {'api_key': self.api_key}
if params:
data.update(params)
response = requests.post(f"{self.base_url}/{endpoint}", data=data)
return response.json()
def get_monitors(self) -> List[dict]:
"""获取所有监控器。"""
result = self._request('getMonitors')
return result.get('monitors', [])
def create_monitor(self, friendly_name: str, url: str,
monitor_type: int = 1) -> dict:
"""创建新监控器。
类型:1=HTTP(s), 2=关键词, 3=Ping, 4=端口
"""
return self._request('newMonitor', {
'friendly_name': friendly_name,
'url': url,
'type': monitor_type
})
def get_monitor_uptime(self, monitor_id: int,
custom_uptime_ratios: str = "7-30-90") -> dict:
"""获取监控器的正常运行时间统计。"""
return self._request('getMonitors', {
'monitors': monitor_id,
'custom_uptime_ratios': custom_uptime_ratios
})
def pause_monitor(self, monitor_id: int) -> dict:
"""暂停监控器。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 0
})
def resume_monitor(self, monitor_id: int) -> dict:
"""恢复监控器。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 1
})
# 使用示例
client = UptimeRobotClient('your-api-key')
# 为重要页面创建监控器
client.create_monitor('新闻首页', 'https://example-news.com')
client.create_monitor('API 状态', 'https://api.example.com/health')
# 检查所有监控器
for monitor in client.get_monitors():
status = '正常' if monitor['status'] == 2 else '异常'
print(f"{monitor['friendly_name']}: {status}")
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib
class RSSGenerator:
"""从网页生成 RSS 源。"""
def __init__(self, feed_id: str, title: str, link: str):
self.fg = FeedGenerator()
self.fg.id(feed_id)
self.fg.title(title)
self.fg.link(href=link)
self.fg.description(f'{title} 的自动生成源')
def add_from_page(self, url: str, item_selector: str,
title_selector: str, link_selector: str,
description_selector: str = None):
"""解析页面并将项目添加到源中。
参数:
url: 要解析的页面 URL
item_selector: 每个项目容器的 CSS 选择器
title_selector: 标题的 CSS 选择器(相对于项目)
link_selector: 链接的 CSS 选择器(相对于项目)
description_selector: 描述的可选 CSS 选择器
"""
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.text, 'html.parser')
items = soup.select(item_selector)
for item in items[:20]: # 限制为 20 个项目
title_elem = item.select_one(title_selector)
link_elem = item.select_one(link_selector)
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
link = link_elem.get('href', '')
# 如果是相对 URL 则转为绝对 URL
if link.startswith('/'):
from urllib.parse import urljoin
link = urljoin(url, link)
fe = self.fg.add_entry()
fe.id(hashlib.md5(link.encode()).hexdigest())
fe.title(title)
fe.link(href=link)
if description_selector:
desc_elem = item.select_one(description_selector)
if desc_elem:
fe.description(desc_elem.get_text(strip=True))
fe.published(datetime.now())
def generate_rss(self) -> str:
"""生成 RSS XML 字符串。"""
return self.fg.rss_str(pretty=True).decode()
def save_rss(self, filepath: str):
"""将 RSS 源保存到文件。"""
self.fg.rss_file(filepath)
# 示例:为没有 RSS 的新闻网站生成源
rss = RSSGenerator(
'https://example.com/news',
'示例新闻源',
'https://example.com/news'
)
rss.add_from_page(
'https://example.com/news',
item_selector='.news-item',
title_selector='h2 a',
link_selector='h2 a',
description_selector='.summary'
)
# 保存源
rss.save_rss('example_feed.xml')
# RSS-Bridge 为没有源的网站生成源
# 支持 Twitter、Instagram、YouTube 和许多其他网站
# Docker 安装
docker pull rssbridge/rss-bridge
docker run -d -p 3000:80 rssbridge/rss-bridge
# 访问 http://localhost:3000
# 选择桥接器,输入参数,获取 RSS 源 URL
# Twarc 需要 Twitter API 凭据
# 安装
# pip install twarc
# 配置
# twarc2 configure
import subprocess
import json
from pathlib import Path
class TwitterArchiver:
"""归档 Twitter 搜索和时间线。"""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def search(self, query: str, max_results: int = 100) -> Path:
"""搜索推文并保存到文件。"""
output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"
subprocess.run([
'twarc2', 'search',
'--max-results', str(max_results),
query,
str(output_file)
], check=True)
return output_file
def get_timeline(self, username: str, max_results: int = 100) -> Path:
"""获取用户时间线。"""
output_file = self.output_dir / f"timeline_{username}.jsonl"
subprocess.run([
'twarc2', 'timeline',
'--max-results', str(max_results),
username,
str(output_file)
], check=True)
return output_file
def parse_archive(self, filepath: Path) -> list[dict]:
"""解析归档的推文。"""
tweets = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
if 'data' in data:
tweets.extend(data['data'])
return tweets
import requests
from typing import Optional
class AlertManager:
"""当监控的页面发生变化时发送告警。"""
def __init__(self, slack_webhook: str = None,
discord_webhook: str = None,
email_config: dict = None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
self.email_config = email_config
def send_slack(self, message: str, channel: str = None):
"""发送 Slack 通知。"""
if not self.slack_webhook:
return
payload = {'text': message}
if channel:
payload['channel'] = channel
requests.post(self.slack_webhook, json=payload)
def send_discord(self, message: str):
"""发送 Discord 通知。"""
if not self.discord_webhook:
return
requests.post(self.discord_webhook, json={'content': message})
def send_email(self, subject: str, body: str, to: str):
"""发送电子邮件通知。"""
if not self.email_config:
return
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = to
with smtplib.SMTP(self.email_config['smtp_host'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
def alert_change(self, page_name: str, url: str,
old_content: str, new_content: str):
"""向所有配置的渠道发送变更告警。"""
message = f"""
页面已变更:{page_name}
URL:{url}
时间:{datetime.now().isoformat()}
之前的内容(预览):
{old_content[:200]}...
新的内容(预览):
{new_content[:200]}...
"""
if self.slack_webhook:
self.send_slack(message)
if self.discord_webhook:
self.send_discord(message)
# 编辑 crontab
crontab -e
# 每 15 分钟检查页面
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
# 每 5 分钟检查关键页面
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
# 每天上午 8 点生成每日摘要报告
0 8 * * * /usr/bin/python3 /path/to/daily_report.py
#!/usr/bin/env python3
"""用于 cron 执行的页面监控脚本。"""
import sys
from pathlib import Path
from datetime import datetime
# 将项目添加到路径
sys.path.insert(0, str(Path(__file__).parent))
from monitor import PageMonitor
from alerts import AlertManager
def main():
# 初始化
monitor = PageMonitor(Path('./data'))
alerts = AlertManager(
slack_webhook='https://hooks.slack.com/services/...',
discord_webhook='https://discord.com/api/webhooks/...'
)
# 检查所有页面
results = monitor.check_all()
# 处理结果
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']
# 对变化发出告警
for change in changes:
alerts.alert_change(
change['name'],
change['url'],
change['previous_content'],
change['new_content']
)
print(f"[{datetime.now()}] 变更:{change['name']}")
# 对错误发出告警
for error in errors:
alerts.send_slack(f"{error['name']} 的监控错误:{error['error']}")
print(f"[{datetime.now()}] 错误:{error['name']} - {error['error']}")
# 摘要
print(f"[{datetime.now()}] 已检查 {len(results)} 个页面,"
f"{len(changes)} 个变更,{len(errors)} 个错误")
if __name__ == '__main__':
main()
from multiarchiver import MultiArchiver
class ArchivingMonitor(PageMonitor):
"""检测到变化时归档内容的页面监控器。"""
def __init__(self, storage_dir: Path):
super().__init__(storage_dir)
self.archiver = MultiArchiver()
def check_page(self, url: str) -> dict:
"""检查页面并在变化时归档。"""
result = super().check_page(url)
if result and result['status'] == 'changed':
# 归档到多个服务
archive_results = self.archiver.archive_url(url)
successful_archives = [
r.archived_url for r in archive_results
if r.success
]
result['archives'] = successful_archives
# 记录归档 URL
print(f"已将 {url} 归档到:")
for archive_url in successful_archives:
print(f" - {archive_url}")
return result
## 新闻/时事监控
### 要监控的页面:
- 突发新闻版块
- 新闻稿页面
- 政府公告页面
- 公司新闻室
### 监控频率:
- 突发新闻:每 5 分钟
- 新闻稿:每 15-30 分钟
- 一般新闻:每小时
### 归档策略:
- 检测到后立即归档
- 同时使用 Wayback Machine 和 Archive.today
- 保存带时间戳的本地副本
## 学术/研究监控
### 要监控的页面:
- 预印本服务器(arXiv、SSRN)
- 期刊目录
- 会议论文集
- 研究者个人资料
### 监控频率:
- 活跃主题:每日
- 一般监控:每周
### 推荐工具:
- Google Scholar 提醒(免费,内置)
- Semantic Scholar 提醒
- 可用的 RSS 源
- 针对特定页面的自定义监控器
## 竞争对手监控
### 要监控的页面:
- 定价页面
- 产品页面
- 招聘信息
- 新闻稿
- 高管简介
### 监控频率:
- 定价:每日
- 产品:每日
- 招聘:每周
- 新闻稿:每日
### 法律注意事项:
- 不要违反服务条款
- 不要绕过访问控制
- 仅限公开页面
- 不要高频抓取
## 监控页面前:
- [ ] 页面是否公开可访问?
- [ ] 是否遵守 robots.txt?
- [ ] 监控频率是否合理?
- [ ] 是否有合法目的?
- [ ] 是否安全存储数据?
- [ ] 是否配置了告警?
- [ ] 是否为重要页面设置了归档?
## 维护:
- [ ] 每月审查监控器
- [ ] 移除过时的监控器
- [ ] 如果页面变化则更新选择器
- [ ] 检查告警投递
- [ ] 验证归档是否正常工作
import time
from functools import wraps
def rate_limit(min_interval: float = 1.0):
"""限制函数调用速率的装饰器。"""
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@rate_limit(min_interval=2.0) # 最多每 2 秒一次
def check_page(url: str):
return requests.get(url)
每周安装数
346
代码仓库
GitHub 星标数
84
首次出现
2026年2月6日
安全审计
安装于
codex343
opencode341
gemini-cli340
cursor340
github-copilot338
kimi-cli337
Patterns for tracking web page changes, detecting content removal, and preserving important pages before they disappear.
| Service | Free Tier | Best For | Storage | Alert Speed |
|---|---|---|---|---|
| Visualping | 5 pages | Visual changes | Standard | Minutes |
| ChangeTower | Yes | Compliance, archiving | 12 years | Minutes |
| Distill.io | 25 pages | Element-level tracking | 12 months | Seconds |
| Wachete | Limited | Login-protected pages | 12 months | Minutes |
| UptimeRobot | 50 monitors | Uptime only | 2 months | Minutes |
// Distill.io allows CSS/XPath selectors for precise monitoring
// Example selectors for common use cases:
// Monitor news article headlines
const newsSelector = '.article-headline, h1.title, .story-title';
// Monitor price changes
const priceSelector = '.price, .product-price, [data-price]';
// Monitor stock/availability
const availabilitySelector = '.in-stock, .availability, .stock-status';
// Monitor specific paragraph or section
const sectionSelector = '#main-content p:first-child';
// Monitor table data
const tableSelector = 'table.data-table tbody tr';
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
class PageMonitor:
"""Simple page change monitor with local storage."""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.state_file = storage_dir / 'monitor_state.json'
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {'pages': {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
"""Get content hash and content for a page or element."""
response = requests.get(url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
})
response.raise_for_status()
if selector:
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
content = element.get_text(strip=True) if element else ''
else:
content = response.text
content_hash = hashlib.sha256(content.encode()).hexdigest()
return content_hash, content
def add_page(self, url: str, name: str, selector: str = None):
"""Add a page to monitor."""
content_hash, content = self._get_page_hash(url, selector)
self.state['pages'][url] = {
'name': name,
'selector': selector,
'last_hash': content_hash,
'last_check': datetime.now().isoformat(),
'last_content': content[:1000], # Store preview
'change_count': 0
}
self._save_state()
print(f"Added: {name} ({url})")
def check_page(self, url: str) -> Optional[dict]:
"""Check single page for changes."""
if url not in self.state['pages']:
return None
page = self.state['pages'][url]
selector = page.get('selector')
try:
new_hash, new_content = self._get_page_hash(url, selector)
except Exception as e:
return {
'url': url,
'name': page['name'],
'status': 'error',
'error': str(e)
}
changed = new_hash != page['last_hash']
result = {
'url': url,
'name': page['name'],
'status': 'changed' if changed else 'unchanged',
'previous_content': page['last_content'],
'new_content': new_content[:1000] if changed else None
}
if changed:
page['last_hash'] = new_hash
page['last_content'] = new_content[:1000]
page['change_count'] += 1
# Archive the change
archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
archive_file.write_text(new_content)
page['last_check'] = datetime.now().isoformat()
self._save_state()
return result
def check_all(self) -> list[dict]:
"""Check all monitored pages."""
results = []
for url in self.state['pages']:
result = self.check_page(url)
if result:
results.append(result)
return results
# Usage
monitor = PageMonitor(Path('./page_monitor_data'))
# Add pages to monitor
monitor.add_page(
'https://example.com/important-page',
'Important Page',
selector='.main-content' # Optional: monitor specific element
)
# Check for changes
results = monitor.check_all()
for result in results:
if result['status'] == 'changed':
print(f"CHANGED: {result['name']}")
print(f" Previous: {result['previous_content'][:100]}...")
print(f" New: {result['new_content'][:100]}...")
import requests
from typing import List, Optional
class UptimeRobotClient:
"""UptimeRobot API client for monitoring page availability."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.uptimerobot.com/v2"
def _request(self, endpoint: str, params: dict = None) -> dict:
data = {'api_key': self.api_key}
if params:
data.update(params)
response = requests.post(f"{self.base_url}/{endpoint}", data=data)
return response.json()
def get_monitors(self) -> List[dict]:
"""Get all monitors."""
result = self._request('getMonitors')
return result.get('monitors', [])
def create_monitor(self, friendly_name: str, url: str,
monitor_type: int = 1) -> dict:
"""Create a new monitor.
Types: 1=HTTP(s), 2=Keyword, 3=Ping, 4=Port
"""
return self._request('newMonitor', {
'friendly_name': friendly_name,
'url': url,
'type': monitor_type
})
def get_monitor_uptime(self, monitor_id: int,
custom_uptime_ratios: str = "7-30-90") -> dict:
"""Get uptime statistics for a monitor."""
return self._request('getMonitors', {
'monitors': monitor_id,
'custom_uptime_ratios': custom_uptime_ratios
})
def pause_monitor(self, monitor_id: int) -> dict:
"""Pause a monitor."""
return self._request('editMonitor', {
'id': monitor_id,
'status': 0
})
def resume_monitor(self, monitor_id: int) -> dict:
"""Resume a monitor."""
return self._request('editMonitor', {
'id': monitor_id,
'status': 1
})
# Usage
client = UptimeRobotClient('your-api-key')
# Create monitors for important pages
client.create_monitor('News Homepage', 'https://example-news.com')
client.create_monitor('API Status', 'https://api.example.com/health')
# Check all monitors
for monitor in client.get_monitors():
status = 'UP' if monitor['status'] == 2 else 'DOWN'
print(f"{monitor['friendly_name']}: {status}")
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib
class RSSGenerator:
"""Generate RSS feeds from web pages."""
def __init__(self, feed_id: str, title: str, link: str):
self.fg = FeedGenerator()
self.fg.id(feed_id)
self.fg.title(title)
self.fg.link(href=link)
self.fg.description(f'Auto-generated feed for {title}')
def add_from_page(self, url: str, item_selector: str,
title_selector: str, link_selector: str,
description_selector: str = None):
"""Parse a page and add items to feed.
Args:
url: Page URL to parse
item_selector: CSS selector for each item container
title_selector: CSS selector for title (relative to item)
link_selector: CSS selector for link (relative to item)
description_selector: Optional CSS selector for description
"""
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.text, 'html.parser')
items = soup.select(item_selector)
for item in items[:20]: # Limit to 20 items
title_elem = item.select_one(title_selector)
link_elem = item.select_one(link_selector)
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
link = link_elem.get('href', '')
# Make absolute URL if relative
if link.startswith('/'):
from urllib.parse import urljoin
link = urljoin(url, link)
fe = self.fg.add_entry()
fe.id(hashlib.md5(link.encode()).hexdigest())
fe.title(title)
fe.link(href=link)
if description_selector:
desc_elem = item.select_one(description_selector)
if desc_elem:
fe.description(desc_elem.get_text(strip=True))
fe.published(datetime.now())
def generate_rss(self) -> str:
"""Generate RSS XML string."""
return self.fg.rss_str(pretty=True).decode()
def save_rss(self, filepath: str):
"""Save RSS feed to file."""
self.fg.rss_file(filepath)
# Example: Generate feed for a news site without RSS
rss = RSSGenerator(
'https://example.com/news',
'Example News Feed',
'https://example.com/news'
)
rss.add_from_page(
'https://example.com/news',
item_selector='.news-item',
title_selector='h2 a',
link_selector='h2 a',
description_selector='.summary'
)
# Save the feed
rss.save_rss('example_feed.xml')
# RSS-Bridge generates feeds for sites without them
# Supports Twitter, Instagram, YouTube, and many others
# Docker installation
docker pull rssbridge/rss-bridge
docker run -d -p 3000:80 rssbridge/rss-bridge
# Access at http://localhost:3000
# Select a bridge, enter parameters, get RSS feed URL
# Twarc requires Twitter API credentials
# Installation
# pip install twarc
# Configure
# twarc2 configure
import subprocess
import json
from pathlib import Path
class TwitterArchiver:
"""Archive Twitter searches and timelines."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def search(self, query: str, max_results: int = 100) -> Path:
"""Search tweets and save to file."""
output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"
subprocess.run([
'twarc2', 'search',
'--max-results', str(max_results),
query,
str(output_file)
], check=True)
return output_file
def get_timeline(self, username: str, max_results: int = 100) -> Path:
"""Get user timeline."""
output_file = self.output_dir / f"timeline_{username}.jsonl"
subprocess.run([
'twarc2', 'timeline',
'--max-results', str(max_results),
username,
str(output_file)
], check=True)
return output_file
def parse_archive(self, filepath: Path) -> list[dict]:
"""Parse archived tweets."""
tweets = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
if 'data' in data:
tweets.extend(data['data'])
return tweets
import requests
from typing import Optional
class AlertManager:
"""Send alerts when monitored pages change."""
def __init__(self, slack_webhook: str = None,
discord_webhook: str = None,
email_config: dict = None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
self.email_config = email_config
def send_slack(self, message: str, channel: str = None):
"""Send Slack notification."""
if not self.slack_webhook:
return
payload = {'text': message}
if channel:
payload['channel'] = channel
requests.post(self.slack_webhook, json=payload)
def send_discord(self, message: str):
"""Send Discord notification."""
if not self.discord_webhook:
return
requests.post(self.discord_webhook, json={'content': message})
def send_email(self, subject: str, body: str, to: str):
"""Send email notification."""
if not self.email_config:
return
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = to
with smtplib.SMTP(self.email_config['smtp_host'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
def alert_change(self, page_name: str, url: str,
old_content: str, new_content: str):
"""Send change alert to all configured channels."""
message = f"""
Page Changed: {page_name}
URL: {url}
Time: {datetime.now().isoformat()}
Previous content (preview):
{old_content[:200]}...
New content (preview):
{new_content[:200]}...
"""
if self.slack_webhook:
self.send_slack(message)
if self.discord_webhook:
self.send_discord(message)
# Edit crontab
crontab -e
# Check pages every 15 minutes
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
# Check critical pages every 5 minutes
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
# Daily summary report at 8 AM
0 8 * * * /usr/bin/python3 /path/to/daily_report.py
#!/usr/bin/env python3
"""Page monitoring script for cron execution."""
import sys
from pathlib import Path
from datetime import datetime
# Add project to path
sys.path.insert(0, str(Path(__file__).parent))
from monitor import PageMonitor
from alerts import AlertManager
def main():
# Initialize
monitor = PageMonitor(Path('./data'))
alerts = AlertManager(
slack_webhook='https://hooks.slack.com/services/...',
discord_webhook='https://discord.com/api/webhooks/...'
)
# Check all pages
results = monitor.check_all()
# Process results
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']
# Alert on changes
for change in changes:
alerts.alert_change(
change['name'],
change['url'],
change['previous_content'],
change['new_content']
)
print(f"[{datetime.now()}] CHANGE: {change['name']}")
# Alert on errors
for error in errors:
alerts.send_slack(f"Monitor error for {error['name']}: {error['error']}")
print(f"[{datetime.now()}] ERROR: {error['name']} - {error['error']}")
# Summary
print(f"[{datetime.now()}] Checked {len(results)} pages, "
f"{len(changes)} changes, {len(errors)} errors")
if __name__ == '__main__':
main()
from multiarchiver import MultiArchiver
class ArchivingMonitor(PageMonitor):
"""Page monitor that archives content when changes detected."""
def __init__(self, storage_dir: Path):
super().__init__(storage_dir)
self.archiver = MultiArchiver()
def check_page(self, url: str) -> dict:
"""Check page and archive if changed."""
result = super().check_page(url)
if result and result['status'] == 'changed':
# Archive to multiple services
archive_results = self.archiver.archive_url(url)
successful_archives = [
r.archived_url for r in archive_results
if r.success
]
result['archives'] = successful_archives
# Log archive URLs
print(f"Archived {url} to:")
for archive_url in successful_archives:
print(f" - {archive_url}")
return result
## News/Current Events Monitoring
### Pages to monitor:
- Breaking news sections
- Press release pages
- Government announcement pages
- Company newsrooms
### Monitoring frequency:
- Breaking news: Every 5 minutes
- Press releases: Every 15-30 minutes
- General news: Every hour
### Archive strategy:
- Archive immediately on detection
- Use both Wayback Machine and Archive.today
- Save local copy with timestamp
## Academic/Research Monitoring
### Pages to monitor:
- Preprint servers (arXiv, SSRN)
- Journal table of contents
- Conference proceedings
- Researcher profiles
### Monitoring frequency:
- Daily for active topics
- Weekly for general monitoring
### Tools recommended:
- Google Scholar alerts (free, built-in)
- Semantic Scholar alerts
- RSS feeds where available
- Custom monitors for specific pages
## Competitor Monitoring
### Pages to monitor:
- Pricing pages
- Product pages
- Job postings
- Press releases
- Executive bios
### Monitoring frequency:
- Pricing: Daily
- Products: Daily
- Jobs: Weekly
- Press: Daily
### Legal considerations:
- Don't violate terms of service
- Don't circumvent access controls
- Public pages only
- Don't scrape at high frequency
## Before monitoring a page:
- [ ] Is the page publicly accessible?
- [ ] Are you respecting robots.txt?
- [ ] Is monitoring frequency reasonable?
- [ ] Do you have a legitimate purpose?
- [ ] Are you storing data securely?
- [ ] Do you have alerts configured?
- [ ] Is archiving set up for important pages?
## Maintenance:
- [ ] Review monitors monthly
- [ ] Remove stale monitors
- [ ] Update selectors if pages change
- [ ] Check alert delivery
- [ ] Verify archives are working
import time
from functools import wraps
def rate_limit(min_interval: float = 1.0):
"""Decorator to rate limit function calls."""
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
# Usage
@rate_limit(min_interval=2.0) # Max once per 2 seconds
def check_page(url: str):
return requests.get(url)
Weekly Installs
346
Repository
GitHub Stars
84
First Seen
Feb 6, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex343
opencode341
gemini-cli340
cursor340
github-copilot338
kimi-cli337
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
140,500 周安装
Orderly Network 入门指南:全链订单簿交易基础设施与AI智能体开发工具
3,000 周安装
Claude-to-IM 桥接技能:一键连接 Claude AI 到 Telegram、飞书、微信等主流通讯平台
3,100 周安装
SuperDesign - AI 驱动设计助手:自动分析代码库,生成设计草稿与设计系统
3,100 周安装
Hono Web框架使用指南:API参考、CLI测试与路由配置教程
3,100 周安装
Python资源管理:上下文管理器自动释放数据库连接、文件句柄和网络套接字
3,100 周安装
Tavily Research:AI驱动的深度研究工具,30秒生成带引用报告
3,200 周安装