重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
kafka-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill kafka-expert提供 Apache Kafka、事件流处理、Kafka Streams 以及构建事件驱动架构的专家指导。
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # 等待所有副本确认
retries=3
)
# 发送消息
future = producer.send('user-events', {
'user_id': '123',
'event': 'login',
'timestamp': '2024-01-01T00:00:00Z'
})
# 等待确认
record_metadata = future.get(timeout=10)
print(f"Topic: {record_metadata.topic}, Partition: {record_metadata.partition}")
producer.flush()
producer.close()
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Received: {message.value}")
# 处理消息
process_event(message.value)
# 手动提交偏移量
consumer.commit()
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// 转换和过滤
KStream<String, String> transformed = source
.filter((key, value) -> value.length() > 10)
.mapValues(value -> value.toUpperCase());
transformed.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
❌ 单分区主题 ❌ 无错误处理 ❌ 忽略消费者滞后 ❌ 向错误分区生产消息 ❌ 不使用消费者组 ❌ 同步处理 ❌ 无监控
58
11
2026年1月23日
opencode49
codex47
gemini-cli44
cursor43
github-copilot42
kimi-cli36
Expert guidance for Apache Kafka, event streaming, Kafka Streams, and building event-driven architectures.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Wait for all replicas
retries=3
)
# Send message
future = producer.send('user-events', {
'user_id': '123',
'event': 'login',
'timestamp': '2024-01-01T00:00:00Z'
})
# Wait for acknowledgment
record_metadata = future.get(timeout=10)
print(f"Topic: {record_metadata.topic}, Partition: {record_metadata.partition}")
producer.flush()
producer.close()
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Received: {message.value}")
# Process message
process_event(message.value)
# Manual commit
consumer.commit()
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// Transform and filter
KStream<String, String> transformed = source
.filter((key, value) -> value.length() > 10)
.mapValues(value -> value.toUpperCase());
transformed.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
❌ Single partition topics ❌ No error handling ❌ Ignoring consumer lag ❌ Producing to wrong partitions ❌ Not using consumer groups ❌ Synchronous processing ❌ No monitoring
Weekly Installs
58
Repository
GitHub Stars
11
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode49
codex47
gemini-cli44
cursor43
github-copilot42
kimi-cli36
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
125,600 周安装
news-shopping:通过Serper API实现Google新闻与购物搜索,助力市场监控与产品研究
54 周安装
WebPerf 性能调试工具包 - 47个Chrome DevTools代码片段,快速诊断网页加载、交互与核心指标
54 周安装
ReasoningBank Intelligence:AI自适应学习系统,实现智能代理的元认知与策略优化
WorkOS Widgets集成指南:用户管理、SSO配置、身份验证组件快速部署
54 周安装
AI结对编程工具:支持多模式协作、实时代码审查与TDD开发
AgentDB性能优化指南:向量数据库150倍至12500倍加速,内存减少4-32倍