kafka-engineer by 404kidwiz/claude-supercode-skills
npx skills add https://github.com/404kidwiz/claude-supercode-skills --skill kafka-engineer提供 Apache Kafka 和事件流处理专业知识,专注于可扩展的事件驱动架构和实时数据管道。构建具备精确一次处理、Kafka Connect 和 Schema Registry 管理能力的容错流处理平台。
What is the use case?
│
├─ **Data Integration (ETL)**
│ ├─ DB to DB/Data Lake? → **Kafka Connect** (Zero code)
│ └─ Complex transformations? → **Kafka Streams**
│
├─ **Real-Time Analytics**
│ ├─ SQL-like queries? → **ksqlDB** (Quick aggregation)
│ └─ Complex stateful logic? → **Kafka Streams / Flink**
│
└─ **Microservices Comm**
├─ Event Notification? → **Standard Producer/Consumer**
└─ Event Sourcing? → **State Stores (RocksDB)**
batch.size, linger.ms, .广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
compression.type=lz4linger.ms=0, acks=1.acks=all, min.insync.replicas=2, replication.factor=3.危险信号 → 升级至 sre-engineer:
目标: 将变更从 PostgreSQL 流式传输到 S3。
步骤:
源配置 (postgres-source.json)
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-host",
"database.dbname": "mydb",
"database.user": "kafka",
"plugin.name": "pgoutput"
}
}
接收器配置 (s3-sink.json)
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-datalake",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": "1000"
}
}
部署
curl -X POST -d @postgres-source.json http://connect:8083/connectors目标: 强制执行模式兼容性。
步骤:
定义模式 (user.avsc)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
生产者(Java)
KafkaAvroSerializer.http://schema-registry:8081.表现:
失败原因:
正确方法:
表现:
失败原因:
正确方法:
表现:
失败原因:
正确方法:
consumer.pause()。配置:
可观测性:
场景: 一家金融服务公司需要使用 Kafka 流处理进行实时欺诈检测。
架构实现:
管道配置:
| 组件 | 配置 | 目的 |
|---|---|---|
| 主题 | 3 (transactions, alerts, enriched) | 数据组织 |
| 分区 | 12 (3 brokers × 4) | 并行处理 |
| 副本 | 3 | 高可用性 |
| 压缩 | LZ4 | 吞吐量优化 |
关键逻辑:
结果:
场景: 使用 Kafka 构建高可靠性的弹性订单处理系统。
系统设计:
弹性模式:
配置:
# Producer Configuration
acks: all
retries: 3
enable.idempotence: true
# Consumer Configuration
auto.offset.reset: earliest
enable.auto.commit: false
max.poll.records: 500
结果:
场景: 使用 Kafka 处理数百万条 IoT 设备遥测消息。
平台架构:
可扩展性配置:
性能指标:
| 指标 | 值 |
|---|---|
| 吞吐量 | 500,000 条消息/秒 |
| 延迟 (P99) | 50ms |
| 消费者延迟 | < 1 秒 |
| 存储效率 | 压缩后减少 60% |
安全:
每周安装次数
157
仓库
GitHub 星标
43
首次出现
2026年1月24日
安全审计
安装于
opencode136
codex127
gemini-cli125
github-copilot125
cursor114
kimi-cli103
Provides Apache Kafka and event streaming expertise specializing in scalable event-driven architectures and real-time data pipelines. Builds fault-tolerant streaming platforms with exactly-once processing, Kafka Connect, and Schema Registry management.
What is the use case?
│
├─ **Data Integration (ETL)**
│ ├─ DB to DB/Data Lake? → **Kafka Connect** (Zero code)
│ └─ Complex transformations? → **Kafka Streams**
│
├─ **Real-Time Analytics**
│ ├─ SQL-like queries? → **ksqlDB** (Quick aggregation)
│ └─ Complex stateful logic? → **Kafka Streams / Flink**
│
└─ **Microservices Comm**
├─ Event Notification? → **Standard Producer/Consumer**
└─ Event Sourcing? → **State Stores (RocksDB)**
batch.size, linger.ms, compression.type=lz4.linger.ms=0, acks=1.acks=all, min.insync.replicas=2, replication.factor=3.Red Flags → Escalate tosre-engineer:
Goal: Stream changes from PostgreSQL to S3.
Steps:
Source Config (postgres-source.json)
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-host",
"database.dbname": "mydb",
"database.user": "kafka",
"plugin.name": "pgoutput"
}
}
Sink Config (s3-sink.json)
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-datalake",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": "1000"
}
}
Deploy
curl -X POST -d @postgres-source.json http://connect:8083/connectorsGoal: Enforce schema compatibility.
Steps:
Define Schema (user.avsc)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
Producer (Java)
KafkaAvroSerializer.http://schema-registry:8081.What it looks like:
Why it fails:
Correct approach:
What it looks like:
Why it fails:
Correct approach:
What it looks like:
Why it fails:
Correct approach:
consumer.pause() if buffer is full.Configuration:
Observability:
Scenario: A financial services company needs real-time fraud detection using Kafka streaming.
Architecture Implementation:
Pipeline Configuration:
| Component | Configuration | Purpose |
|---|---|---|
| Topics | 3 (transactions, alerts, enriched) | Data organization |
| Partitions | 12 (3 brokers × 4) | Parallelism |
| Replication | 3 | High availability |
| Compression | LZ4 | Throughput optimization |
Key Logic:
Results:
Scenario: Build a resilient order processing system with Kafka for high reliability.
System Design:
Resilience Patterns:
Configuration:
# Producer Configuration
acks: all
retries: 3
enable.idempotence: true
# Consumer Configuration
auto.offset.reset: earliest
enable.auto.commit: false
max.poll.records: 500
Results:
Scenario: Process millions of IoT device telemetry messages with Kafka.
Platform Architecture:
Scalability Configuration:
Performance Metrics:
| Metric | Value |
|---|---|
| Throughput | 500,000 messages/sec |
| Latency (P99) | 50ms |
| Consumer lag | < 1 second |
| Storage efficiency | 60% reduction with compression |
Security:
Weekly Installs
157
Repository
GitHub Stars
43
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode136
codex127
gemini-cli125
github-copilot125
cursor114
kimi-cli103
Azure 升级评估与自动化工具 - 轻松迁移 Functions 计划、托管层级和 SKU
79,900 周安装