重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
ingestion-pipeline-doctor-nodejs by posthog/posthog
npx skills add https://github.com/posthog/posthog --skill ingestion-pipeline-doctor-nodejsPostHog 数据摄取管道框架及其规范检查代理的快速参考。
数据摄取管道通过一个类型化、可组合的步骤链来处理事件:
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()
实际实现请参见 nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts。
| 内容 | 位置 |
|---|---|
| 步骤类型 | nodejs/src/ingestion/pipelines/steps.ts |
| 结果类型 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
nodejs/src/ingestion/pipelines/results.ts| 文档测试章节 | nodejs/src/ingestion/pipelines/docs/*.test.ts |
| 合并管道 | nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts |
| 医生代理 | .claude/agents/ingestion/ |
| 测试辅助工具 | nodejs/src/ingestion/pipelines/docs/helpers.ts |
| 关注点 | 代理 | 何时使用 |
|---|---|---|
| 步骤结构 | pipeline-step-doctor | 工厂模式、类型扩展、配置注入、命名 |
| 结果处理 | pipeline-result-doctor | ok/dlq/drop/redirect、副作用、摄取警告 |
| 组合 | pipeline-composition-doctor | 构建器链、并发、分组、分支、重试 |
| 测试 | pipeline-testing-doctor | 测试辅助工具、断言、模拟计时器、文档测试风格 |
步骤 : 工厂函数返回一个命名的内部函数。使用泛型 <T extends Input> 进行类型扩展。不使用 any。通过闭包进行配置。
结果 : 使用 ok()、dlq()、drop()、redirect() 构造函数。副作用作为 promise 放在 ok(value, [effects]) 中。警告作为第三个参数。
组合 : messageAware 包装管道。handleResults 在 messageAware 内部。handleSideEffects 在其后。使用 groupBy + concurrently 处理每个实体的工作。在批处理步骤之前使用 gather。
测试 : 步骤测试直接调用工厂函数。使用 consumeAll()/collectBatches() 辅助工具。使用模拟计时器处理异步。使用类型守卫进行结果断言。不使用 any。
请 Claude "run all pipeline doctors on my recent changes" 以获取涵盖所有 4 个关注领域的全面审查。
每周安装次数
49
代码仓库
GitHub 星标数
32.3K
首次出现
2026年2月27日
安全审计
安装于
opencode48
gemini-cli48
amp48
github-copilot48
codex48
kimi-cli48
Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.
The ingestion pipeline processes events through a typed, composable step chain:
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()
See nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts for the real implementation.
| What | Where |
|---|---|
| Step type | nodejs/src/ingestion/pipelines/steps.ts |
| Result types | nodejs/src/ingestion/pipelines/results.ts |
| Doc-test chapters | nodejs/src/ingestion/pipelines/docs/*.test.ts |
| Joined pipeline | nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts |
| Doctor agents | .claude/agents/ingestion/ |
| Test helpers | nodejs/src/ingestion/pipelines/docs/helpers.ts |
| Concern | Agent | When to use |
|---|---|---|
| Step structure | pipeline-step-doctor | Factory pattern, type extension, config injection, naming |
| Result handling | pipeline-result-doctor | ok/dlq/drop/redirect, side effects, ingestion warnings |
| Composition | pipeline-composition-doctor | Builder chain, concurrency, grouping, branching, retries |
| Testing | pipeline-testing-doctor | Test helpers, assertions, fake timers, doc-test style |
Steps : Factory function returning a named inner function. Generic <T extends Input> for type extension. No any. Config via closure.
Results : Use ok(), dlq(), drop(), redirect() constructors. Side effects as promises in ok(value, [effects]). Warnings as third parameter.
Composition : messageAware wraps the pipeline. handleResults inside messageAware. handleSideEffects after. groupBy + concurrently for per-entity work. gather before batch steps.
Testing : Step tests call factory directly. Use consumeAll()/collectBatches() helpers. Fake timers for async. Type guards for result assertions. No any.
Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.
Weekly Installs
49
Repository
GitHub Stars
32.3K
First Seen
Feb 27, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode48
gemini-cli48
amp48
github-copilot48
codex48
kimi-cli48
Google Sheets 销售跟踪自动化:记录交易更新到表格的完整指南
8,200 周安装