重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
kafka-stream-processing by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill kafka-stream-processing使用 Apache Kafka 构建事件驱动应用程序的综合技能。掌握生产者、消费者、Kafka Streams、连接器、模式注册表以及用于大规模实时数据处理的生产部署模式。
在以下情况下使用此技能:
Kafka 是一个分布式流处理平台,专为高吞吐量、容错、实时数据处理而设计。
关键组件:
设计原则:
Kafka 设计理念:
- 高吞吐量:每秒数百万条消息
- 低延迟:个位数毫秒延迟
- 持久性:复制的持久存储
- 可扩展性:通过分区进行水平扩展
- 容错性:自动故障转移和恢复
- 消息传递语义:至少一次,支持精确一次
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
主题 是数据流的逻辑通道。每个主题被划分为多个分区以实现并行性和可扩展性。
# 创建一个具有 20 个分区和复制因子为 3 的主题
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y
分区特性:
添加分区:
# 增加分区数量(无法减少)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name \
--partitions 40
注意:添加分区不会重新分配现有数据,并且可能影响使用自定义分区的消费者。
<h3>流分区和任务</h3>
<p> Kafka 的消息传递层对数据进行分区以进行存储和传输。Kafka Streams 对数据进行分区以进行处理。在这两种情况下,这种分区实现了数据局部性、弹性、可扩展性、高性能和容错性。Kafka Streams 使用<b>分区</b>和<b>任务</b>的概念作为其基于 Kafka 主题分区的并行模型中的逻辑单元。在并行性方面,Kafka Streams 和 Kafka 之间存在紧密联系:</p>
<ul>
<li>每个<b>流分区</b>都是一个完全有序的数据记录序列,并映射到一个 Kafka <b>主题分区</b>。</li>
<li>流中的<b>数据记录</b>映射到来自该主题的 Kafka <b>消息</b>。</li>
<li>数据记录的<b>键</b>决定了 Kafka 和 Kafka Streams 中数据的分区方式,即数据如何路由到主题内的特定分区。</li>
</ul>
<p> 应用程序的处理器拓扑通过将其分解为多个任务来扩展。更具体地说,Kafka Streams 根据应用程序的输入流分区创建固定数量的任务,每个任务分配有来自输入流(即 Kafka 主题)的分区列表。分区到任务的分配永远不会改变,因此每个任务都是应用程序并行性的固定单元。然后,任务可以根据分配的分区实例化自己的处理器拓扑;它们还为其每个分配的分区维护一个缓冲区,并从这些记录缓冲区中一次处理一条消息。因此,流任务可以独立且并行地处理,无需手动干预。</p>
<p> 简而言之,应用程序可以运行的最大并行度受限于最大流任务数,而最大流任务数本身由应用程序正在读取的输入主题的最大分区数决定。例如,如果您的输入主题有 5 个分区,那么您最多可以运行 5 个应用程序实例。这些实例将协作处理主题的数据。如果您运行的应用程序实例数超过输入主题的分区数,则“多余”的应用程序实例将启动但保持空闲;但是,如果其中一个繁忙实例发生故障,一个空闲实例将恢复其工作。</p>
<p> 重要的是要理解,Kafka Streams 不是一个资源管理器,而是一个在其流处理应用程序运行的任何地方“运行”的库。应用程序的多个实例可以在同一台机器上执行,也可以分布在多台机器上,并且任务可以由库自动分配到那些正在运行的应用程序实例。分区到任务的分配永远不会改变;如果应用程序实例失败,其分配的所有任务将在其他实例上自动重新启动,并继续从相同的流分区消费。</p>
设计:
- 生产者:设计考虑。
- 消费者:设计考虑。
- 消息传递语义:至少一次、至多一次、精确一次。
- 使用事务进行原子操作。
至少一次传递:
至多一次传递:
精确一次语义:
ProducerClient:
publish(topic: str, message: bytes, partition_key: Optional[str] = None)
topic:要发布消息到的主题。
message:要发送的消息负载。
partition_key:用于确定分区的可选键。如果为 None,则使用随机分区。
get_metadata(topic: str) -> dict
topic:要获取元数据的主题。
返回:包含代理信息和分区领导者详细信息的字典。
生产者将数据定向到分区领导者代理,无需路由层。Kafka 节点向生产者提供元数据,以将请求定向到正确的分区领导者。生产者可以实现自定义分区逻辑或使用随机分布。
Kafka 生产者将记录发布到主题,具有可配置的可靠性和性能特征。
生产者 API:
- send(record):将记录发送到 Kafka 主题。
- 参数:
- record:要发送的记录,包括主题、键和值。
- 返回:表示发送操作结果的 Future。
- flush():强制发送任何缓冲的记录。
- close():关闭生产者,释放任何资源。
- metrics():返回有关生产者的指标。
配置:
- bootstrap.servers:用于建立与 Kafka 集群初始连接的主机/端口对列表。
- key.serializer:实现 org.apache.kafka.common.serialization.Serializer 接口的键序列化器类。
- value.serializer:实现 org.apache.kafka.common.serialization.Serializer 接口的值序列化器类。
- acks:生产者在认为请求完成之前要求领导者收到的确认数量。
- linger.ms:生产者将请求传输之间到达的任何记录分组到单个批处理请求中。
- batch.size:每当向同一分区发送多条记录时,生产者将尝试将记录一起批处理到更少的请求中。
基本设置:
host1:9092,host2:9092,host3:9092org.apache.kafka.common.serialization.StringSerializerorg.apache.kafka.common.serialization.ByteArraySerializer0:无确认(发送即忘)1:仅领导者确认all / -1:所有同步副本确认(最强持久性)true:启用幂等生产者(防止重复)false:默认行为性能调优:
none、gzip、snappy、lz4、zstdimport org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");
// 带回调的异步发送
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error producing: " + exception);
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
}
}
Kafka 消费者从主题读取记录,支持基于个体和基于组的消费。
消费者组通过自动负载均衡和容错实现并行处理。
关键概念:
消费者组监控:
# 列出消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 描述消费者组成员及其分区分配
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
基本设置:
true:定期自动提交偏移量false:手动偏移量管理earliest:从开始处开始latest:从末尾开始none:抛出异常特定于消费者的 Kafka Streams 默认值:
参数名称:max.poll.records
对应客户端:Consumer
Streams 默认值:100
参数名称:client.id
对应客户端:-
Streams 默认值:<application.id>-<random-UUID>
参数名称:enable.auto.commit
描述:控制消费者是否自动提交偏移量。当为 true 时,消费者将根据轮询间隔定期自动提交偏移量。
默认值:true
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 处理记录
processRecord(record);
}
// 处理批次后手动提交
consumer.commitSync();
}
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
// 业务逻辑在此处
}
}
偏移量提交策略:
// 带回调的异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception);
}
});
// 为可靠性进行同步提交
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e);
}
Kafka Streams 是一个客户端库,用于构建具有状态处理的实时流处理应用程序。
处理器拓扑:
拓扑中有两种特殊的处理器:
<ul>
<li><b>源处理器</b>:一种特殊类型的流处理器,没有任何上游处理器。它通过从这些主题消费记录并将其转发到其下游处理器,从一个或多个 Kafka 主题向其拓扑生成输入流。</li>
<li><b>接收器处理器</b>:一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收的任何记录发送到指定的 Kafka 主题。</li>
</ul>
请注意,在普通处理器节点中,在处理当前记录时也可以访问其他远程系统。因此,处理结果可以流回 Kafka 或写入外部系统。
子拓扑:
应用程序被分解为由重新分区主题连接的子拓扑。每个子拓扑可以独立扩展。
KStream :不可变的记录流
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> wordCounts = builder.stream(
"word-counts-input-topic", /* 输入主题 */
Consumed.with(
Serdes.String(), /* 键序列化/反序列化器 */
Serdes.Long() /* 值序列化/反序列化器 */
)
);
KTable :变更日志流(每个键的最新值)
import org.apache.kafka.streams.StreamsBuilder;
StreamsBuilder builder = new StreamsBuilder();
builder.table("input-topic");
GlobalKTable :所有实例可用的完全复制表
KTable:每个应用程序实例仅从一个分区获取数据。
GlobalKTable:每个应用程序实例从所有分区获取数据。
KStream<String, Long> stream = ...;
// 将流写入输出主题,使用配置的默认键和值序列化/反序列化器。
stream.to("my-stream-output-topic");
// 将流写入输出主题,使用显式的键和值序列化/反序列化器,
// (从而覆盖配置属性中的默认值)。
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()));
任何流和表都可以(连续地)写回 Kafka 主题。根据情况,输出数据可能会被重新分区。
使用指定分区数进行手动重新分区:
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));
Kafka Streams 将生成的主题作为内部主题进行管理,确保数据清除并允许下游子拓扑扩展。此操作在先前执行了键更改操作且未触发自动重新分区时非常有用。
连接共同分区要求:
对于 Kafka Streams 中的等值连接,输入数据必须共同分区。这确保了连接两侧具有相同键的记录被传递到同一个流任务。
数据共同分区的要求:
1. 输入主题(左侧和右侧)必须具有相同数量的分区。
2. 所有写入输入主题的应用程序必须使用相同的分区策略,以确保具有相同键的记录被传递到相同的分区号。
- 这适用于生产者设置,如 `partitioner.class`(例如,`ProducerConfig.PARTITIONER_CLASS_CONFIG`)以及 Kafka Streams 的 `StreamPartitioner`,用于诸如 `KStream#to()` 等操作。
- 在所有应用程序中使用默认分区器设置通常可以满足此要求。
为什么需要共同分区:
- KStream-KStream、KTable-KTable 和 KStream-KTable 连接基于记录键执行(例如,`leftRecord.key == rightRecord.key`)。按键共同分区确保这些记录相遇。
不需要共同分区的例外情况:
1. KStream-GlobalKTable 连接:
- GlobalKTable 底层变更日志流的所有分区对每个 KafkaStreams 实例都可用。
- `KeyValueMapper` 允许从 KStream 到 GlobalKTable 进行非基于键的连接。
2. KTable-KTable 外键连接:
- Kafka Streams 内部确保这些连接的共同分区。
Kafka Streams 支持有状态操作,如聚合、窗口化和连接,使用状态存储。
状态存储类型:
状态存储配置:
内部主题配置:
- message.timestamp.type:所有内部主题为 'CreateTime'。
- 内部重新分区主题:
- compaction.policy:'delete'
- retention.time:-1(无限)
- 用于键值存储的内部变更日志主题:
- compaction.policy:'compact'
- 用于窗口化键值存储的内部变更日志主题:
- compaction.policy:'delete,compact'
- retention.time:24 小时 + 窗口化存储设置
- 用于版本化状态存储的内部变更日志主题:
- cleanup.policy:'compact'
- min.compaction.lag.ms:24 小时 + 存储的 historyRetentionMs
Kafka Streams 应用程序的并行性主要由输入主题的分区数决定。例如,如果您的应用程序从具有十个分区的单个主题读取,那么您最多可以运行十个应用程序实例。您可以运行更多实例,但这些实例将处于空闲状态。
主题分区数是您的 Kafka Streams 应用程序并行性以及应用程序运行实例数量的上限。
为了实现跨应用程序实例的均衡工作负载处理并防止处理热点,您应该分配数据和处理工作负载:
数据应均匀分布在主题分区中。例如,两个主题分区各有一百万条消息,这比一个分区有两百万条消息而另一个分区没有消息要好。
处理工作负载应均匀分布在主题分区中。例如,如果处理消息的时间差异很大,那么最好将处理密集型的消息分布在各个分区中,而不是将这些消息存储在同一分区内。
客户端前缀:
Properties streamsSettings = new Properties();
// 消费者、生产者和管理客户端使用相同的值
streamsSettings.put("PARAMETER_NAME", "value");
// 消费者和生产者使用不同的值
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// 或者,您可以使用
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
特定消费者类型:
Properties streamsSettings = new Properties();
// 所有消费者类型使用相同的配置值
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// 设置不同的恢复消费者配置。这将使恢复消费者使用 restore-consumer-value,
// 而主消费者和全局消费者仍使用 general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// 或者,您可以使用
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
主题配置:
Properties streamsSettings = new Properties();
// 覆盖变更日志和重新分区主题的默认值
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// 或者,您可以使用
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
生产者客户端 ID 命名模式:
- 至少一次(默认):
`[client.Id]-StreamThread-[sequence-number]`
- 精确一次(EOS 版本 1):
`[client.Id]-StreamThread-[sequence-number]-[taskId]`
- 精确一次-beta(EOS 版本 2):
`[client.Id]-StreamThread-[sequence-number]`
其中 `[client.Id]` 通过 Streams 配置参数 `client.id` 设置,或默认为 `[application.id]-[processId]`(`[processId]` 是一个随机 UUID)。
EOS 配置:
参数名称:isolation.level
对应客户端:Consumer
Streams 默认值:READ_COMMITTED
参数名称:enable.idempotence
对应客户端:Producer
Streams 默认值:true
参数名称:transaction.timeout.ms
对应客户端:Producer
Streams 默认值:10000
参数名称:delivery.timeout.ms
对应客户端:Producer
Streams 默认值:Integer.MAX_VALUE
默认拓扑(自动生成的名称):
Topologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002
显式命名以实现稳定性:
Kafka Streams 拓扑命名:
- 聚合重新分区主题:Grouped
- KStream-KTable 连接重新分区主题:Joined
- KStream-KStream 连接重新分区主题:StreamJoined
- KStream-KTable 连接状态存储:Joined
- KStream-KStream 连接状态存储:StreamJoined
- 状态存储(用于聚合和 KTable-KTable 连接):Materialized
- 流/表无状态操作:Named
操作 命名类
------------------------------------------------------------------
聚合重新分区主题 Grouped
KStream-KStream 连接重新分区主题 StreamJoined
KStream-KTable 连接重新分区主题 Joined
KStream-KStream 连接状态存储 StreamJoined
状态存储(用于聚合和 KTable-KTable 连接) Materialized
流/表无状态操作 Named
强制显式命名:
Properties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
这可以防止应用程序以自动生成的名称启动,从而保证跨拓扑更新的稳定性。
"topology.optimization":"all"
"topology.optimization":"none"
拓扑优化允许将源主题重用为变更日志主题,这在从 KStreamBuilder 迁移到 StreamsBuilder 时至关重要。
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: \[\]) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002(stores: \[\]) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000005(stores: \[\]) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003(stores: \[counts-store\]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007(stores: \[\]) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores: none
此拓扑显示了两个断开的子拓扑、它们的源、处理器、接收器以及用于按聚合键混洗数据的重新分区主题(counts-store-repartition)。
模式注册表强制执行生产者和消费者之间的数据契约,确保数据完整性并防止格式错误的事件。
使用模式注册表的数据契约:
- 目的:确保写入 Kafka 的事件可以被正确读取,并防止格式错误的事件。
- 实现:在 Kafka 集群旁边部署一个模式注册表。
- 功能:管理事件模式并将其映射到主题,指导生产者使用正确的事件格式。
- 注意:Kafka 不包含模式注册表;有第三方实现可用。
支持的格式:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
String userSchema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"User\","
+ "\"fields\":["
+ " {\"name\":\"name\",\"type\":\"string\"},"
+ " {\"name\":\"age\",\"type\":\"int\"}"
+ "]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord user = new GenericData.Record(schema);
user.put("name", "John Doe");
user.put("age", 30);
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("users", "user1", user);
producer.send(record);
Kafka Connect 是一个用于在 Kafka 和外部系统之间流式传输数据的框架。
Kafka Connect 接收器连接器输入主题
用于指定接收器连接器输入主题的配置选项,使用逗号分隔列表或正则表达式。
topics
topics.regex
源连接器 :将数据导入 Kafka
接收器连接器 :从 Kafka 导出数据
源连接器示例 (JDBC):
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-"
}
}
**接收器连接器示例 (Elasticsearch):
A comprehensive skill for building event-driven applications with Apache Kafka. Master producers, consumers, Kafka Streams, connectors, schema registry, and production deployment patterns for real-time data processing at scale.
Use this skill when:
Kafka is a distributed streaming platform designed for high-throughput, fault-tolerant, real-time data processing.
Key Components:
Design Principles:
Kafka Design Philosophy:
- High Throughput: Millions of messages per second
- Low Latency: Single-digit millisecond latency
- Durability: Replicated, persistent storage
- Scalability: Horizontal scaling via partitions
- Fault Tolerance: Automatic failover and recovery
- Message Delivery Semantics: At-least-once, exactly-once support
Topics are logical channels for data streams. Each topic is divided into partitions for parallelism and scalability.
# Create a topic with 20 partitions and replication factor 3
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y
Partition Characteristics:
Adding Partitions:
# Increase partition count (cannot decrease)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name \
--partitions 40
Note : Adding partitions doesn't redistribute existing data and may affect consumers using custom partitioning.
<h3>Stream Partitions and Tasks</h3>
<p> The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it. In both cases, this partitioning is what enables data locality, elasticity, scalability, high performance, and fault tolerance. Kafka Streams uses the concepts of <b>partitions</b> and <b>tasks</b> as logical units of its parallelism model based on Kafka topic partitions. There are close links between Kafka Streams and Kafka in the context of parallelism: </p>
<ul>
<li>Each <b>stream partition</b> is a totally ordered sequence of data records and maps to a Kafka <b>topic partition</b>.</li>
<li>A <b>data record</b> in the stream maps to a Kafka <b>message</b> from that topic.</li>
<li>The <b>keys</b> of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.</li>
</ul>
<p> An application's processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention. </p>
<p> Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic's data. If you run a larger number of app instances than partitions of the input topic, the "excess" app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former's work. </p>
<p> It is important to understand that Kafka Streams is not a resource manager, but a library that "runs" anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be automatically restarted on other instances and continue to consume from the same stream partitions. </p>
Design:
- The Producer: Design considerations.
- The Consumer: Design considerations.
- Message Delivery Semantics: At-least-once, at-most-once, exactly-once.
- Using Transactions for atomic operations.
At-Least-Once Delivery:
At-Most-Once Delivery:
Exactly-Once Semantics (EOS):
ProducerClient:
publish(topic: str, message: bytes, partition_key: Optional[str] = None)
topic: The topic to publish the message to.
message: The message payload to send.
partition_key: Optional key to determine the partition. If None, random partitioning is used.
get_metadata(topic: str) -> dict
topic: The topic to get metadata for.
Returns: A dictionary containing broker information and partition leader details.
The producer directs data to the partition leader broker without a routing tier. Kafka nodes provide metadata to producers for directing requests to the correct partition leaders. Producers can implement custom partitioning logic or use random distribution.
Kafka producers publish records to topics with configurable reliability and performance characteristics.
Producer API:
- send(record): Sends a record to a Kafka topic.
- Parameters:
- record: The record to send, including topic, key, and value.
- Returns: A Future representing the result of the send operation.
- flush(): Forces any buffered records to be sent.
- close(): Closes the producer, releasing any resources.
- metrics(): Returns metrics about the producer.
Configuration:
- bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
- key.serializer: The serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
- value.serializer: The serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
- acks: The number of acknowledgments the producer requires the leader to have received before considering a request complete.
- linger.ms: The producer groups together any records that arrive in between request transmissions into a single batched request.
- batch.size: The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
Essential Settings:
bootstrap.servers : Kafka cluster connection string
host1:9092,host2:9092,host3:9092key.serializer / value.serializer : Data serialization
org.apache.kafka.common.serialization.StringSerializerorg.apache.kafka.common.serialization.ByteArraySerializeracks : Acknowledgment level
0: No acknowledgment (fire and forget)1: Leader acknowledgment onlyall / -1: All in-sync replicas acknowledge (strongest durability)Performance Tuning:
linger.ms : Batching delay
batch.size : Batch size in bytes
compression.type : Message compression
none, gzip, snappy, lz4, zstdbuffer.memory : Total producer buffer memory
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");
// Async send with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error producing: " + exception);
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
}
}
Kafka consumers read records from topics, supporting both individual and group-based consumption.
Consumer groups enable parallel processing with automatic load balancing and fault tolerance.
Key Concepts:
Consumer Group Monitoring:
# List consumer groups
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Describe consumer group members with partition assignments
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
Essential Settings:
true: Auto-commit offsets periodicallyfalse: Manual offset managementearliest: Start from beginninglatest: Start from endnone: Throw exceptionConsumer-Specific Kafka Streams Defaults:
Parameter Name: max.poll.records
Corresponding Client: Consumer
Streams Default: 100
Parameter Name: client.id
Corresponding Client: -
Streams Default: <application.id>-<random-UUID>
Parameter Name: enable.auto.commit
Description: Controls whether the consumer automatically commits offsets. When true, the consumer will automatically commit offsets periodically based on the poll interval.
Default Value: true
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
// Process record
processRecord(record);
}
// Manual commit after processing batch
consumer.commitSync();
}
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
Offset Commit Strategies:
Auto-commit (default):
Manual Synchronous Commit:
Manual Asynchronous Commit:
Hybrid Approach:
// Async commit with callback
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception);
}
});
// Sync commit for reliability
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e);
}
Kafka Streams is a client library for building real-time streaming applications with stateful processing.
Processor Topology:
There are two special processors in the topology:
<ul>
<li><b>Source Processor</b>: A special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.</li>
<li><b>Sink Processor</b>: A special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.</li>
</ul>
Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
Sub-topologies:
Applications are decomposed into sub-topologies connected by repartition topics. Each sub-topology can scale independently.
KStream : Immutable stream of records
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> wordCounts = builder.stream(
"word-counts-input-topic", /* input topic */
Consumed.with(
Serdes.String(), /* key serde */
Serdes.Long() /* value serde */
)
);
KTable : Changelog stream (latest value per key)
import org.apache.kafka.streams.StreamsBuilder;
StreamsBuilder builder = new StreamsBuilder();
builder.table("input-topic");
GlobalKTable : Fully replicated table available to all instances
KTable: Each application instance gets data from only 1 partition.
GlobalKTable: Each application instance gets data from all partitions.
KStream<String, Long> stream = ...;
// Write the stream to the output topic, using the configured default key
// and value serdes.
stream.to("my-stream-output-topic");
// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults in the config properties).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Any streams and tables may be (continuously) written back to a Kafka topic. The output data might be re-partitioned depending on the situation.
Manual repartitioning with specified partition count:
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));
Kafka Streams manages the generated topic as an internal topic, ensuring data purging and allowing for scaling downstream sub-topologies. This operation is useful when key-changing operations are performed beforehand and auto-repartitioning is not triggered.
Join Co-partitioning Requirements:
For equi-joins in Kafka Streams, input data must be co-partitioned. This ensures that records with the same key from both sides of the join are delivered to the same stream task.
Requirements for data co-partitioning:
1. Input topics (left and right sides) must have the same number of partitions.
2. All applications writing to the input topics must use the same partitioning strategy to ensure records with the same key are delivered to the same partition number.
- This applies to producer settings like `partitioner.class` (e.g., `ProducerConfig.PARTITIONER_CLASS_CONFIG`) and Kafka Streams `StreamPartitioner` for operations like `KStream#to()`.
- Using default partitioner settings across all applications generally satisfies this requirement.
Why co-partitioning is required:
- KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on record keys (e.g., `leftRecord.key == rightRecord.key`). Co-partitioning by key ensures these records meet.
Exceptions where co-partitioning is NOT required:
1. KStream-GlobalKTable joins:
- All partitions of the GlobalKTable's underlying changelog stream are available to each KafkaStreams instance.
- A `KeyValueMapper` allows non-key based joins from KStream to GlobalKTable.
2. KTable-KTable Foreign-Key joins:
- Kafka Streams internally ensures co-partitioning for these joins.
Kafka Streams supports stateful operations like aggregations, windowing, and joins using state stores.
State Store Types:
State Store Configuration:
Internal Topic Configuration:
- message.timestamp.type: 'CreateTime' for all internal topics.
- Internal Repartition Topics:
- compaction.policy: 'delete'
- retention.time: -1 (infinite)
- Internal Changelog Topics for Key-Value Stores:
- compaction.policy: 'compact'
- Internal Changelog Topics for Windowed Key-Value Stores:
- compaction.policy: 'delete,compact'
- retention.time: 24 hours + windowed store setting
- Internal Changelog Topics for Versioned State Stores:
- cleanup.policy: 'compact'
- min.compaction.lag.ms: 24 hours + store's historyRetentionMs
The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.
The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.
To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:
Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.
Client Prefixes:
Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
Specific Consumer Types:
Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
Topic Configuration:
Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
Producer Client ID Naming Schema:
- at-least-once (default):
`[client.Id]-StreamThread-[sequence-number]`
- exactly-once (EOS version 1):
`[client.Id]-StreamThread-[sequence-number]-[taskId]`
- exactly-once-beta (EOS version 2):
`[client.Id]-StreamThread-[sequence-number]`
Where `[client.Id]` is either set via Streams configuration parameter `client.id` or defaults to `[application.id]-[processId]` (`[processId]` is a random UUID).
EOS Configuration:
Parameter Name: isolation.level
Corresponding Client: Consumer
Streams Default: READ_COMMITTED
Parameter Name: enable.idempotence
Corresponding Client: Producer
Streams Default: true
Parameter Name: transaction.timeout.ms
Corresponding Client: Producer
Streams Default: 10000
Parameter Name: delivery.timeout.ms
Corresponding Client: Producer
Streams Default: Integer.MAX_VALUE
Default Topology (Auto-generated names):
Topologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002
Explicit Naming for Stability:
Kafka Streams Topology Naming:
- Aggregation repartition topics: Grouped
- KStream-KTable Join repartition topic: Joined
- KStream-KStream Join repartition topics: StreamJoined
- KStream-KTable Join state stores: Joined
- KStream-KStream Join state stores: StreamJoined
- State Stores (for aggregations and KTable-KTable joins): Materialized
- Stream/Table non-stateful operations: Named
Operation Naming Class
------------------------------------------------------------------
Aggregation repartition topics Grouped
KStream-KStream Join repartition topics StreamJoined
KStream-KTable Join repartition topic Joined
KStream-KStream Join state stores StreamJoined
State Stores (for aggregations and KTable-KTable joins) Materialized
Stream/Table non-stateful operations Named
Enforce Explicit Naming:
Properties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
This prevents the application from starting with auto-generated names, guaranteeing stability across topology updates.
"topology.optimization":"all"
"topology.optimization":"none"
Topology optimization allows reuse of source topics as changelog topics, crucial when migrating from KStreamBuilder to StreamsBuilder.
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: \[\]) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002(stores: \[\]) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000005(stores: \[\]) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003(stores: \[counts-store\]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007(stores: \[\]) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores: none
This topology shows two disconnected sub-topologies, their sources, processors, sinks, and the repartition topic (counts-store-repartition) used for shuffling data by aggregation key.
Schema registries enforce data contracts between producers and consumers, ensuring data integrity and preventing malformed events.
Data Contracts with Schema Registry:
- Purpose: Ensure events written to Kafka can be read properly and prevent malformed events.
- Implementation: Deploy a schema registry alongside the Kafka cluster.
- Functionality: Manages event schemas and maps them to topics, guiding producers on correct event formats.
- Note: Kafka does not include a schema registry; third-party implementations are available.
Supported Formats:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
String userSchema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"User\","
+ "\"fields\":["
+ " {\"name\":\"name\",\"type\":\"string\"},"
+ " {\"name\":\"age\",\"type\":\"int\"}"
+ "]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord user = new GenericData.Record(schema);
user.put("name", "John Doe");
user.put("age", 30);
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("users", "user1", user);
producer.send(record);
Kafka Connect is a framework for streaming data between Kafka and external systems.
Kafka Connect Sink Connector Input Topics
Configuration options for sink connectors to specify input topics using a comma-separated list or a regular expression.
topics
topics.regex
Source Connectors : Import data into Kafka
Sink Connectors : Export data from Kafka
Source Connector Example (JDBC):
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-"
}
}
Sink Connector Example (Elasticsearch):
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "user-events,order-events",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "false"
}
}
CDC captures database changes and streams them to Kafka in real-time.
Benefits:
Popular CDC Connectors:
Kafka Streams Topic Management:
User Topics:
- Input Topics: Specified via source processors (e.g., StreamsBuilder#stream(), StreamsBuilder#table(), Topology#addSource()).
- Output Topics: Specified via sink processors (e.g., KStream#to(), KTable.to(), Topology#addSink()).
- Management: Must be created and managed manually ahead of time (e.g., via topic tools).
- Sharing: If shared, users must coordinate topic management.
- Auto-creation: Discouraged due to potential cluster configuration and default topic settings (e.g., replication factor).
Internal Topics:
- Purpose: Used internally by the application for state stores (e.g., changelog topics).
- Creation: Created by the application itself.
- Usage: Only used by the specific stream application.
- Permissions: Requires underlying clients to have admin permissions on Kafka brokers if security is enabled.
- Naming Convention: Typically follows '<application.id>-<operatorName>-<suffix>', but not guaranteed for future releases.
DESCRIBE_PRODUCERS:
- Action: Read
- Resource: Topic
DESCRIBE_TOPIC_PARTITIONS:
- Action: Describe
- Resource: Topic
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summit
Metric Name: outgoing-byte-rate
Description: The average number of outgoing bytes sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: outgoing-byte-total
Description: The total number of outgoing bytes sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-rate
Description: The average number of requests sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-total
Description: The total number of requests sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-size-avg
Description: The average size of all requests in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-size-max
Description: The maximum size of any request sent in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: incoming-byte-rate
Description: The average number of incoming bytes received per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Producer Metrics:
Consumer Metrics:
Broker Metrics:
Streams Metrics:
Multi-Broker Setup:
Broker Configuration:
# Broker ID
broker.id=1
# Listeners
listeners=PLAINTEXT://broker1:9092,SSL://broker1:9093
# Log directories (use multiple disks)
log.dirs=/data/kafka-logs-1,/data/kafka-logs-2
# Replication
default.replication.factor=3
min.insync.replicas=2
# Leader election
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true
# Log retention
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
API: DescribeTopicPartitions
Purpose: Fetches detailed information about topic partitions, including Eligible Leader Replicas (ELR).
Usage:
- Via Admin Client: The admin client can fetch ELR info by describing topics.
- Direct API Call: Use the DescribeTopicPartitions API endpoint.
ELR Selection Logic:
- If ELR is not empty, select a replica that is not fenced.
- Select the last known leader if it is unfenced, mimicking pre-4.0 behavior when all replicas are offline.
Dependencies/Side Effects:
- Updating `min.insync.replicas` for a topic will clean the ELR field for that topic.
- Updating the cluster default `min.insync.replicas` will clean ELR fields for all topics.
Return Values:
- ELR status and related replica information for partitions.
SSL/TLS Encryption:
# SSL configuration
listeners=SSL://broker:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.client.auth=required
SASL Authentication:
# SASL/PLAIN configuration
listeners=SASL_SSL://broker:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS configuration
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";
Broker Tuning:
# Network threads
num.network.threads=8
# I/O threads
num.io.threads=16
# Socket buffer sizes
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
# Replication
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
# Log flush (rely on OS page cache)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=null
Producer Tuning for Throughput:
acks=1
linger.ms=100
batch.size=65536
compression.type=lz4
buffer.memory=67108864
max.in.flight.requests.per.connection=5
Consumer Tuning:
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
Store all state changes as immutable events:
// Order events
OrderCreated -> OrderPaid -> OrderShipped -> OrderDelivered
// Event store as Kafka topic
Topic: order-events
Compaction: None (keep full history)
Retention: Infinite or very long
Separate read and write models:
// Write side: Commands produce events
commands -> producers -> events-topic
// Read side: Consumers build projections
events-topic -> streams -> materialized-view (KTable)
Distributed transaction coordination:
// Order saga
order-requested -> payment-requested -> payment-completed ->
inventory-reserved -> order-confirmed
// Compensating transactions on failure
payment-failed -> order-cancelled
Reliably publish database changes:
// Database transaction writes to outbox table
BEGIN TRANSACTION;
INSERT INTO orders VALUES (...);
INSERT INTO outbox VALUES (event_data);
COMMIT;
// CDC connector reads outbox and publishes to Kafka
Debezium -> outbox-topic -> downstream consumers
Broadcast events to multiple consumers:
// Single topic, multiple consumer groups
user-events topic
-> email-service (consumer group: email)
-> analytics-service (consumer group: analytics)
-> notification-service (consumer group: notifications)
Handle processing failures:
try {
processRecord(record);
} catch (RetriableException e) {
// Retry
retry(record);
} catch (NonRetriableException e) {
// Send to DLQ
sendToDLQ(record, e);
}
Time-based aggregations:
KStream<String, PageView> views = ...;
// Tumbling window: non-overlapping fixed windows
views.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Hopping window: overlapping windows
views.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.count();
// Session window: activity-based windows
views.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count();
Issue: Consumer lag increasing
Issue: Messages not arriving
Issue: Duplicate messages
Issue: Rebalancing taking too long
Issue: Partition leader unavailable
Issue: Out of memory errors
KStreamBuilder to StreamsBuilder:
kstream.repartition(...);
// or for user-managed topics:
kstream.to("user-topic");
streamsBuilder.stream("user-topic");
Replaces KStream.through() for managing topic repartitioning.
Topic Prefix Configuration:
Properties props = new Properties();
props.put(StreamsConfig.topicPrefix("my-prefix.") + "replication.factor", "3");
KafkaStreams streams = new KafkaStreams(topology, props);
Skill Version : 1.0.0 Last Updated : October 2025 Skill Category : Stream Processing, Event-Driven Architecture, Real-Time Data Compatible With : Apache Kafka 2.x, 3.x, Confluent Platform
Weekly Installs
61
Repository
GitHub Stars
47
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
gemini-cli44
opencode44
cursor43
codex42
claude-code42
github-copilot40
智能OCR文字识别工具 - 支持100+语言,高精度提取图片/PDF/手写文本
1,200 周安装
retries : Retry count for failed sends
enable.idempotence : Exactly-once semantics
true: Enables idempotent producer (prevents duplicates)false: Default behavior