social-media-intelligence by jamditis/claude-skills-journalism
npx skills add https://github.com/jamditis/claude-skills-journalism --skill social-media-intelligence用于新闻工作的社交媒体监测、分析与调查的系统化方法。
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import hashlib
class Platform(Enum):
TWITTER = "twitter"
FACEBOOK = "facebook"
INSTAGRAM = "instagram"
TIKTOK = "tiktok"
YOUTUBE = "youtube"
REDDIT = "reddit"
THREADS = "threads"
BLUESKY = "bluesky"
MASTODON = "mastodon"
@dataclass
class SocialPost:
platform: Platform
post_id: str
author: str
content: str
timestamp: datetime
url: str
engagement: Dict[str, int] = field(default_factory=dict)
media_urls: List[str] = field(default_factory=list)
archived_urls: List[str] = field(default_factory=list)
content_hash: str = ""
def __post_init__(self):
# Hash content for duplicate detection
self.content_hash = hashlib.md5(
f"{self.platform.value}:{self.content}".encode()
).hexdigest()
@dataclass
class MonitoringQuery:
keywords: List[str]
platforms: List[Platform]
accounts: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
exclude_terms: List[str] = field(default_factory=list)
start_date: Optional[datetime] = None
def to_search_string(self, platform: Platform) -> str:
"""生成平台特定的搜索查询。"""
parts = []
# 关键词
if self.keywords:
parts.append(' OR '.join(f'"{k}"' for k in self.keywords))
# 话题标签
if self.hashtags:
parts.append(' OR '.join(f'#{h}' for h in self.hashtags))
# 排除项
if self.exclude_terms:
parts.append(' '.join(f'-{t}' for t in self.exclude_terms))
return ' '.join(parts)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from collections import defaultdict
from datetime import datetime, timedelta
class BreakingNewsDetector:
"""检测关键词提及量的突然激增。"""
def __init__(self, baseline_window_hours: int = 24):
self.baseline_window = timedelta(hours=baseline_window_hours)
self.mention_history = defaultdict(list)
def add_mention(self, keyword: str, timestamp: datetime):
"""记录关键词的一次提及。"""
self.mention_history[keyword].append(timestamp)
# 清理旧数据
cutoff = datetime.now() - self.baseline_window * 2
self.mention_history[keyword] = [
t for t in self.mention_history[keyword] if t > cutoff
]
def is_spiking(self, keyword: str, threshold_multiplier: float = 3.0) -> bool:
"""检查关键词提及量是否超过基线。"""
now = datetime.now()
recent = sum(1 for t in self.mention_history[keyword]
if t > now - timedelta(hours=1))
baseline_hourly = len([
t for t in self.mention_history[keyword]
if t > now - self.baseline_window
]) / self.baseline_window.total_seconds() * 3600
if baseline_hourly == 0:
return recent > 10 # 新话题的任意阈值
return recent > baseline_hourly * threshold_multiplier
def get_trending(self, top_n: int = 10) -> List[tuple]:
"""按激增强度排序获取关键词。"""
spikes = []
for keyword in self.mention_history:
if self.is_spiking(keyword):
recent = sum(1 for t in self.mention_history[keyword]
if t > datetime.now() - timedelta(hours=1))
spikes.append((keyword, recent))
return sorted(spikes, key=lambda x: x[1], reverse=True)[:top_n]
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class AccountAnalysis:
username: str
platform: Platform
created_date: Optional[datetime] = None
follower_count: int = 0
following_count: int = 0
post_count: int = 0
# 真实性信号
profile_photo_is_stock: Optional[bool] = None
bio_contains_keywords: List[str] = field(default_factory=list)
posts_primarily_reshares: Optional[bool] = None
posting_pattern_irregular: Optional[bool] = None
engagement_ratio_suspicious: Optional[bool] = None
def calculate_red_flags(self) -> dict:
"""评估账户真实性。"""
flags = {}
# 账户年龄
if self.created_date:
age_days = (datetime.now() - self.created_date).days
if age_days < 30:
flags['new_account'] = f"创建于 {age_days} 天前"
# 粉丝比例
if self.following_count > 0:
ratio = self.follower_count / self.following_count
if ratio < 0.1:
flags['low_follower_ratio'] = f"比例: {ratio:.2f}"
# 发帖频率
if self.created_date and self.post_count > 0:
age_days = max(1, (datetime.now() - self.created_date).days)
posts_per_day = self.post_count / age_days
if posts_per_day > 50:
flags['excessive_posting'] = f"{posts_per_day:.0f} 条/天"
# 库存照片检查
if self.profile_photo_is_stock:
flags['stock_profile_photo'] = "个人资料图片似乎是库存图片"
return flags
def authenticity_score(self) -> int:
"""0-100 分,分数越高越可能为真实账户。"""
score = 100
flags = self.calculate_red_flags()
penalty_per_flag = 20
score -= len(flags) * penalty_per_flag
return max(0, score)
from collections import defaultdict
from typing import Set, Dict
class AccountNetwork:
"""映射账户间的关系。"""
def __init__(self):
self.interactions = defaultdict(lambda: defaultdict(int))
self.accounts = {}
def add_interaction(self, from_account: str, to_account: str,
interaction_type: str = "mention"):
"""记录账户间的一次互动。"""
self.interactions[from_account][to_account] += 1
def find_clusters(self, min_interactions: int = 3) -> List[Set[str]]:
"""发现频繁互动的账户群组。"""
# 构建满足最小阈值要求的邻接关系
adjacency = defaultdict(set)
for from_acc, targets in self.interactions.items():
for to_acc, count in targets.items():
if count >= min_interactions:
adjacency[from_acc].add(to_acc)
adjacency[to_acc].add(from_acc)
# 查找连通分量
visited = set()
clusters = []
for account in adjacency:
if account in visited:
continue
cluster = set()
stack = [account]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
cluster.add(current)
stack.extend(adjacency[current] - visited)
if len(cluster) > 1:
clusters.append(cluster)
return sorted(clusters, key=len, reverse=True)
def coordination_score(self, accounts: Set[str]) -> float:
"""评估一组账户的协调程度。"""
if len(accounts) < 2:
return 0.0
total_possible = len(accounts) * (len(accounts) - 1)
actual_connections = 0
for acc in accounts:
for other in accounts:
if acc != other and self.interactions[acc][other] > 0:
actual_connections += 1
return actual_connections / total_possible if total_possible > 0 else 0
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
@dataclass
class Claim:
text: str
first_seen: datetime
first_seen_url: str
variations: List[str] = field(default_factory=list)
appearances: List[Dict] = field(default_factory=list)
def add_appearance(self, url: str, platform: Platform,
timestamp: datetime, author: str):
"""追踪此声明出现的位置。"""
self.appearances.append({
'url': url,
'platform': platform.value,
'timestamp': timestamp,
'author': author
})
def spread_timeline(self) -> List[Dict]:
"""获取声明按时间顺序的传播情况。"""
return sorted(self.appearances, key=lambda x: x['timestamp'])
def platforms_reached(self) -> Dict[str, int]:
"""按平台统计出现次数。"""
counts = defaultdict(int)
for app in self.appearances:
counts[app['platform']] += 1
return dict(counts)
def velocity(self, window_hours: int = 24) -> float:
"""计算传播速率(每小时出现次数)。"""
if not self.appearances:
return 0.0
recent = [
a for a in self.appearances
if a['timestamp'] > datetime.now() - timedelta(hours=window_hours)
]
return len(recent) / window_hours
from collections import Counter
from datetime import datetime, timedelta
class HashtagAnalyzer:
"""分析话题标签使用模式。"""
def __init__(self):
self.hashtag_posts = defaultdict(list)
def add_post(self, hashtags: List[str], post: SocialPost):
"""记录帖子的标签。"""
for tag in hashtags:
self.hashtag_posts[tag.lower()].append(post)
def co_occurrence(self, hashtag: str, top_n: int = 10) -> List[tuple]:
"""查找常与此标签一起出现的其他标签。"""
co_tags = Counter()
for post in self.hashtag_posts.get(hashtag.lower(), []):
# 从帖子内容中提取话题标签
tags = [
word.lower() for word in post.content.split()
if word.startswith('#')
]
for tag in tags:
if tag != f'#{hashtag.lower()}':
co_tags[tag] += 1
return co_tags.most_common(top_n)
def posting_pattern(self, hashtag: str) -> Dict:
"""分析带有此标签的帖子的发布时间。"""
posts = self.hashtag_posts.get(hashtag.lower(), [])
hour_counts = Counter(p.timestamp.hour for p in posts)
day_counts = Counter(p.timestamp.strftime('%A') for p in posts)
return {
'by_hour': dict(hour_counts),
'by_day': dict(day_counts),
'total_posts': len(posts),
'unique_authors': len(set(p.author for p in posts))
}
import requests
from datetime import datetime
from typing import Optional
class SocialArchiver:
"""在删除前归档社交媒体内容。"""
def __init__(self):
self.archived = {}
def archive_to_wayback(self, url: str) -> Optional[str]:
"""将 URL 提交到 Internet Archive。"""
try:
save_url = f"https://web.archive.org/save/{url}"
response = requests.get(save_url, timeout=30)
if response.status_code == 200:
archived_url = response.url
self.archived[url] = {
'wayback': archived_url,
'archived_at': datetime.now().isoformat()
}
return archived_url
except Exception as e:
print(f"归档失败: {e}")
return None
def archive_to_archive_today(self, url: str) -> Optional[str]:
"""将 URL 提交到 archive.today。"""
try:
response = requests.post(
'https://archive.today/submit/',
data={'url': url},
timeout=60
)
if response.status_code == 200:
return response.url
except Exception as e:
print(f"Archive.today 失败: {e}")
return None
def full_archive(self, url: str) -> dict:
"""归档到多个服务以实现冗余。"""
results = {
'original_url': url,
'archived_at': datetime.now().isoformat(),
'archives': {}
}
wayback = self.archive_to_wayback(url)
if wayback:
results['archives']['wayback'] = wayback
archive_today = self.archive_to_archive_today(url)
if archive_today:
results['archives']['archive_today'] = archive_today
return results
## 协调性虚假行为指标
### 时间模式
- [ ] 多个账户在几分钟内发布相同内容
- [ ] 跨账户的同步发布时间
- [ ] 突发活动后进入休眠
- [ ] 发帖速度快于人类打字速度
### 内容模式
- [ ] 跨账户内容完全相同或高度相似
- [ ] 多个账户分享相同的图片/媒体
- [ ] 相同的拼写错误或格式错误
- [ ] 可见的复制粘贴痕迹
### 账户模式
- [ ] 账户创建时间相近
- [ ] 相似的命名规则(姓名+数字)
- [ ] 通用或库存个人资料照片
- [ ] 个人内容极少,主要为转发
- [ ] 关注相同的账户
- [ ] 彼此互动比例异常
### 网络模式
- [ ] 在网络分析中形成密集集群
- [ ] 放大相同的外部来源
- [ ] 针对相同的账户或话题标签
- [ ] 可见的跨平台协调
def coordination_likelihood(posts: List[SocialPost]) -> dict:
"""评估帖子代表协调活动的可能性。"""
if len(posts) < 2:
return {'score': 0, 'signals': []}
signals = []
score = 0
# 检查内容是否相同
contents = [p.content for p in posts]
unique_contents = set(contents)
if len(unique_contents) < len(contents) * 0.5:
signals.append("内容重复率高")
score += 30
# 检查时间聚类
timestamps = sorted(p.timestamp for p in posts)
rapid_posts = 0
for i in range(1, len(timestamps)):
if (timestamps[i] - timestamps[i-1]).seconds < 60:
rapid_posts += 1
if rapid_posts > len(posts) * 0.3:
signals.append("可疑的时间聚类")
score += 25
# 检查唯一作者数
authors = set(p.author for p in posts)
if len(authors) > 5 and len(contents) / len(authors) > 2:
signals.append("作者少,相似帖子多")
score += 20
return {
'score': min(100, score),
'signals': signals,
'posts_analyzed': len(posts),
'unique_authors': len(authors)
}
| 平台 | 监测工具 | 备注 |
|---|---|---|
| Twitter/X | TweetDeck, Brandwatch | API 限制日益增加 |
| CrowdTangle (受限) | 目前仅限学术访问 | |
| Later, Brandwatch | 无公开搜索 API | |
| TikTok | Exolyt, Pentos | 历史数据有限 |
| Pushshift, Arctic Shift | 归档访问权限各异 | |
| YouTube | YouTube Data API | 元数据访问良好 |
| Bluesky | Firehose API | 开放,实时访问 |
| 字段 | 值 |
|---|---|
| Version | 1.0.0 |
| Created | 2025-12-26 |
| Author | Claude Skills for Journalism |
| Domain | Journalism, OSINT |
| Complexity | Advanced |
每周安装量
80
仓库
GitHub 星标数
84
首次出现
Feb 6, 2026
安全审计
安装于
codex78
cursor77
gemini-cli77
opencode77
github-copilot76
amp74
Systematic approaches for monitoring, analyzing, and investigating social media for journalism.
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import hashlib
class Platform(Enum):
TWITTER = "twitter"
FACEBOOK = "facebook"
INSTAGRAM = "instagram"
TIKTOK = "tiktok"
YOUTUBE = "youtube"
REDDIT = "reddit"
THREADS = "threads"
BLUESKY = "bluesky"
MASTODON = "mastodon"
@dataclass
class SocialPost:
platform: Platform
post_id: str
author: str
content: str
timestamp: datetime
url: str
engagement: Dict[str, int] = field(default_factory=dict)
media_urls: List[str] = field(default_factory=list)
archived_urls: List[str] = field(default_factory=list)
content_hash: str = ""
def __post_init__(self):
# Hash content for duplicate detection
self.content_hash = hashlib.md5(
f"{self.platform.value}:{self.content}".encode()
).hexdigest()
@dataclass
class MonitoringQuery:
keywords: List[str]
platforms: List[Platform]
accounts: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
exclude_terms: List[str] = field(default_factory=list)
start_date: Optional[datetime] = None
def to_search_string(self, platform: Platform) -> str:
"""Generate platform-specific search query."""
parts = []
# Keywords
if self.keywords:
parts.append(' OR '.join(f'"{k}"' for k in self.keywords))
# Hashtags
if self.hashtags:
parts.append(' OR '.join(f'#{h}' for h in self.hashtags))
# Exclusions
if self.exclude_terms:
parts.append(' '.join(f'-{t}' for t in self.exclude_terms))
return ' '.join(parts)
from collections import defaultdict
from datetime import datetime, timedelta
class BreakingNewsDetector:
"""Detect sudden spikes in keyword mentions."""
def __init__(self, baseline_window_hours: int = 24):
self.baseline_window = timedelta(hours=baseline_window_hours)
self.mention_history = defaultdict(list)
def add_mention(self, keyword: str, timestamp: datetime):
"""Record a mention of a keyword."""
self.mention_history[keyword].append(timestamp)
# Prune old data
cutoff = datetime.now() - self.baseline_window * 2
self.mention_history[keyword] = [
t for t in self.mention_history[keyword] if t > cutoff
]
def is_spiking(self, keyword: str, threshold_multiplier: float = 3.0) -> bool:
"""Check if keyword is spiking above baseline."""
now = datetime.now()
recent = sum(1 for t in self.mention_history[keyword]
if t > now - timedelta(hours=1))
baseline_hourly = len([
t for t in self.mention_history[keyword]
if t > now - self.baseline_window
]) / self.baseline_window.total_seconds() * 3600
if baseline_hourly == 0:
return recent > 10 # Arbitrary threshold for new topics
return recent > baseline_hourly * threshold_multiplier
def get_trending(self, top_n: int = 10) -> List[tuple]:
"""Get keywords sorted by spike intensity."""
spikes = []
for keyword in self.mention_history:
if self.is_spiking(keyword):
recent = sum(1 for t in self.mention_history[keyword]
if t > datetime.now() - timedelta(hours=1))
spikes.append((keyword, recent))
return sorted(spikes, key=lambda x: x[1], reverse=True)[:top_n]
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class AccountAnalysis:
username: str
platform: Platform
created_date: Optional[datetime] = None
follower_count: int = 0
following_count: int = 0
post_count: int = 0
# Authenticity signals
profile_photo_is_stock: Optional[bool] = None
bio_contains_keywords: List[str] = field(default_factory=list)
posts_primarily_reshares: Optional[bool] = None
posting_pattern_irregular: Optional[bool] = None
engagement_ratio_suspicious: Optional[bool] = None
def calculate_red_flags(self) -> dict:
"""Score account authenticity."""
flags = {}
# Account age
if self.created_date:
age_days = (datetime.now() - self.created_date).days
if age_days < 30:
flags['new_account'] = f"Created {age_days} days ago"
# Follower ratio
if self.following_count > 0:
ratio = self.follower_count / self.following_count
if ratio < 0.1:
flags['low_follower_ratio'] = f"Ratio: {ratio:.2f}"
# Posting frequency
if self.created_date and self.post_count > 0:
age_days = max(1, (datetime.now() - self.created_date).days)
posts_per_day = self.post_count / age_days
if posts_per_day > 50:
flags['excessive_posting'] = f"{posts_per_day:.0f} posts/day"
# Stock photo check
if self.profile_photo_is_stock:
flags['stock_profile_photo'] = "Profile appears to be stock image"
return flags
def authenticity_score(self) -> int:
"""0-100 score, higher = more likely authentic."""
score = 100
flags = self.calculate_red_flags()
penalty_per_flag = 20
score -= len(flags) * penalty_per_flag
return max(0, score)
from collections import defaultdict
from typing import Set, Dict
class AccountNetwork:
"""Map relationships between accounts."""
def __init__(self):
self.interactions = defaultdict(lambda: defaultdict(int))
self.accounts = {}
def add_interaction(self, from_account: str, to_account: str,
interaction_type: str = "mention"):
"""Record an interaction between accounts."""
self.interactions[from_account][to_account] += 1
def find_clusters(self, min_interactions: int = 3) -> List[Set[str]]:
"""Find groups of accounts that frequently interact."""
# Build adjacency with minimum threshold
adjacency = defaultdict(set)
for from_acc, targets in self.interactions.items():
for to_acc, count in targets.items():
if count >= min_interactions:
adjacency[from_acc].add(to_acc)
adjacency[to_acc].add(from_acc)
# Find connected components
visited = set()
clusters = []
for account in adjacency:
if account in visited:
continue
cluster = set()
stack = [account]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
cluster.add(current)
stack.extend(adjacency[current] - visited)
if len(cluster) > 1:
clusters.append(cluster)
return sorted(clusters, key=len, reverse=True)
def coordination_score(self, accounts: Set[str]) -> float:
"""Score how coordinated a group of accounts appears."""
if len(accounts) < 2:
return 0.0
total_possible = len(accounts) * (len(accounts) - 1)
actual_connections = 0
for acc in accounts:
for other in accounts:
if acc != other and self.interactions[acc][other] > 0:
actual_connections += 1
return actual_connections / total_possible if total_possible > 0 else 0
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
@dataclass
class Claim:
text: str
first_seen: datetime
first_seen_url: str
variations: List[str] = field(default_factory=list)
appearances: List[Dict] = field(default_factory=list)
def add_appearance(self, url: str, platform: Platform,
timestamp: datetime, author: str):
"""Track where this claim has appeared."""
self.appearances.append({
'url': url,
'platform': platform.value,
'timestamp': timestamp,
'author': author
})
def spread_timeline(self) -> List[Dict]:
"""Get chronological spread of the claim."""
return sorted(self.appearances, key=lambda x: x['timestamp'])
def platforms_reached(self) -> Dict[str, int]:
"""Count appearances by platform."""
counts = defaultdict(int)
for app in self.appearances:
counts[app['platform']] += 1
return dict(counts)
def velocity(self, window_hours: int = 24) -> float:
"""Calculate spread rate in appearances per hour."""
if not self.appearances:
return 0.0
recent = [
a for a in self.appearances
if a['timestamp'] > datetime.now() - timedelta(hours=window_hours)
]
return len(recent) / window_hours
from collections import Counter
from datetime import datetime, timedelta
class HashtagAnalyzer:
"""Analyze hashtag usage patterns."""
def __init__(self):
self.hashtag_posts = defaultdict(list)
def add_post(self, hashtags: List[str], post: SocialPost):
"""Record a post's hashtags."""
for tag in hashtags:
self.hashtag_posts[tag.lower()].append(post)
def co_occurrence(self, hashtag: str, top_n: int = 10) -> List[tuple]:
"""Find hashtags that commonly appear with this one."""
co_tags = Counter()
for post in self.hashtag_posts.get(hashtag.lower(), []):
# Extract hashtags from post content
tags = [
word.lower() for word in post.content.split()
if word.startswith('#')
]
for tag in tags:
if tag != f'#{hashtag.lower()}':
co_tags[tag] += 1
return co_tags.most_common(top_n)
def posting_pattern(self, hashtag: str) -> Dict:
"""Analyze when posts with this hashtag appear."""
posts = self.hashtag_posts.get(hashtag.lower(), [])
hour_counts = Counter(p.timestamp.hour for p in posts)
day_counts = Counter(p.timestamp.strftime('%A') for p in posts)
return {
'by_hour': dict(hour_counts),
'by_day': dict(day_counts),
'total_posts': len(posts),
'unique_authors': len(set(p.author for p in posts))
}
import requests
from datetime import datetime
from typing import Optional
class SocialArchiver:
"""Archive social content before deletion."""
def __init__(self):
self.archived = {}
def archive_to_wayback(self, url: str) -> Optional[str]:
"""Submit URL to Internet Archive."""
try:
save_url = f"https://web.archive.org/save/{url}"
response = requests.get(save_url, timeout=30)
if response.status_code == 200:
archived_url = response.url
self.archived[url] = {
'wayback': archived_url,
'archived_at': datetime.now().isoformat()
}
return archived_url
except Exception as e:
print(f"Archive failed: {e}")
return None
def archive_to_archive_today(self, url: str) -> Optional[str]:
"""Submit URL to archive.today."""
try:
response = requests.post(
'https://archive.today/submit/',
data={'url': url},
timeout=60
)
if response.status_code == 200:
return response.url
except Exception as e:
print(f"Archive.today failed: {e}")
return None
def full_archive(self, url: str) -> dict:
"""Archive to multiple services for redundancy."""
results = {
'original_url': url,
'archived_at': datetime.now().isoformat(),
'archives': {}
}
wayback = self.archive_to_wayback(url)
if wayback:
results['archives']['wayback'] = wayback
archive_today = self.archive_to_archive_today(url)
if archive_today:
results['archives']['archive_today'] = archive_today
return results
## Coordinated inauthentic behavior indicators
### Timing patterns
- [ ] Multiple accounts posting same content within minutes
- [ ] Synchronized posting times across accounts
- [ ] Burst activity followed by dormancy
- [ ] Posts appear faster than human typing speed
### Content patterns
- [ ] Identical or near-identical text across accounts
- [ ] Same images/media shared by multiple accounts
- [ ] Identical typos or formatting errors
- [ ] Copy-paste artifacts visible
### Account patterns
- [ ] Accounts created around same time
- [ ] Similar naming conventions (name + numbers)
- [ ] Generic or stock profile photos
- [ ] Minimal personal content, mostly shares
- [ ] Follow the same accounts
- [ ] Engage with each other disproportionately
### Network patterns
- [ ] Form dense clusters in network analysis
- [ ] Amplify same external sources
- [ ] Target same accounts or hashtags
- [ ] Cross-platform coordination visible
def coordination_likelihood(posts: List[SocialPost]) -> dict:
"""Score how likely posts represent coordinated activity."""
if len(posts) < 2:
return {'score': 0, 'signals': []}
signals = []
score = 0
# Check for identical content
contents = [p.content for p in posts]
unique_contents = set(contents)
if len(unique_contents) < len(contents) * 0.5:
signals.append("High content duplication")
score += 30
# Check timing clusters
timestamps = sorted(p.timestamp for p in posts)
rapid_posts = 0
for i in range(1, len(timestamps)):
if (timestamps[i] - timestamps[i-1]).seconds < 60:
rapid_posts += 1
if rapid_posts > len(posts) * 0.3:
signals.append("Suspicious timing clusters")
score += 25
# Check unique authors
authors = set(p.author for p in posts)
if len(authors) > 5 and len(contents) / len(authors) > 2:
signals.append("Few authors, many similar posts")
score += 20
return {
'score': min(100, score),
'signals': signals,
'posts_analyzed': len(posts),
'unique_authors': len(authors)
}
| Platform | Monitoring Tool | Notes |
|---|---|---|
| Twitter/X | TweetDeck, Brandwatch | API increasingly restricted |
| CrowdTangle (limited) | Academic access only now | |
| Later, Brandwatch | No public API for search | |
| TikTok | Exolyt, Pentos | Limited historical data |
| Pushshift, Arctic Shift | Archive access varies | |
| YouTube | YouTube Data API | Good metadata access |
| Bluesky | Firehose API | Open, real-time access |
| Field | Value |
|---|---|
| Version | 1.0.0 |
| Created | 2025-12-26 |
| Author | Claude Skills for Journalism |
| Domain | Journalism, OSINT |
| Complexity | Advanced |
Weekly Installs
80
Repository
GitHub Stars
84
First Seen
Feb 6, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex78
cursor77
gemini-cli77
opencode77
github-copilot76
amp74
Excel财务建模规范与xlsx文件处理指南:专业格式、零错误公式与数据分析
46,700 周安装