Lazy loaded image
🗒️kafka 二、源码阅读-生产者
Words 3594Read Time 9 min
2025-5-17
2025-9-13
type
status
date
slug
summary
tags
category
icon
password
原文
 

1.启动配置

1.1 server.propertis

  • process.roles=broker,controller
    • kRaft 角色,broker处理生产/消费请求,controller 管理集群元数据(如分区分配、Leader 选举)
      组合模式:单节点同时承担两种角色(适合开发环境、生产建议分离)
  • bootstrap.servers
    • 一组 Kafka broker 的地址,客户端会从这些地址中获取集群的元数据,以建立与整个集群的连接。

2.生产者

KafkaProducer 主要组件
KafkaProducer 主要组件
  • ProducerConfig 中描述了生产者的相关配置,如果有些用户没有配置,就采用默认配置。
  • 扩展:
    • 分区器 partitionPlugin 用于自定义分区策略
    • keySerializerPlugin 用于自定义消息 key 的序列化,
    • valueSerializerPlugin 用于自定义消息内容的序列化
  • RecordAccumulator 充当本地队列的作用,它内部维护了一个缓冲池 BuffPool,缓冲池的大小取决于buffer.memory 的配置大小,默认 32MB。生产者创建消息时,会将两次发送间隔的消息组成批次放入到 ProducerBatch 中。
  • sender 线程 KafkaProducer启动时会创建一个 sender线程用于发送消息到 Borker,它会从 缓冲池中获得已就绪的 ProdcerBatch,然后将其发送到 Borker。
 

2.1 生产者发送消息流程

生产者就是发送消息的客户端程序,具体的时由 kafkaPorducer 来完成。
 
  1. 消息创建与拦截处理
  1. 序列化与分区选择
  1. 消息追加到缓冲区(RecordAccumulator)
  1. Sender 线程异步发送消息到 Broker
  1. Broker 接收并持久化消息
  1. 回调处理与资源释放
 

2.1.1 消息的创建与拦截处理

 
应用发送消息时,首先会将消息处理成 ProducerRecord 对象,这里面记录了分区、topic、Header、Key 等信息。
在发送之前,会有拦截器ProducerInterceptors 对消息进行处理。拦截器可以理解是 kafka 的客户端实现AOP的核心组件,Kafka 提供了 Interceptor 机制来允许用户在消息发送和接收的过程中插入自定义逻辑。
💡
ProducerInterceptors 提供了三类事件处理
  • onSend 消息发送前触发
  • onSendError 发送过程中发生错误时触发
  • onAcknowledgement 消息被服务端确认时触发(成功或失败)

2.1.2 序列化与分区选择

2.1.3 分区选择

kafka 的消息是与分区绑定的,分区选择是消息的负载均衡策略。
分区选择策略有以下几种
  1. 如果指定了分区,则使用指定的分区
  1. 自定义分区器 如果指定了
    1. 实现Partitioner 接口,来实现自定义分区策略
  1. hash(如果指定了 key,则使用该方式)
    1. 发送消息时,指定了key,murmur2.hash(key)% topic分区数量。
       
  1. 内置分区策略
    1. 没有指定 key的情况下,会返回一个特殊值 RecordMetadata.UNKNOWN_PARTITION(-1) ,在构建消息批次队列时,使用该值。
      内置分区策略也叫粘性分区策略,该策略依赖集群元数据和每个主题分区的负载统计信息。
      具体的选择分区规则如下:
    2. 生成基础随机数
    3. 获取当前分区负载统计快照(PartitionLoadStats)
    4. 判断PartitionLoadStats 是否为空
      1. PartitionLoadStats为空 (无分区负载统计信息)
        1. 根据 topic获取可用分区列表(依赖集群元数据),如果可用列表不为空,从可用分区列表随机选择一个。
          如果为空则从全部分区选择一个
      2. PartitionLoadStats 不为空
        1. 根据加权随机算法选择一个分区,低负载的分区被选中的几率高。
          - 示例:队列大小[0,3,1] → 权重[4,1,3] → 累积[4,5,8]
          • 随机数0-3 → 分区0,4 → 分区1,5-7 → 分区2
      分区的选择根据消息的 Key来决定(自定义分区策略除外),
      • 未指定分区的情况下,但消息有 key时,对 key 进行 hash运算,得到一个 hash 值,将 hash值与主题的分区数取模(hash(key)%分区数),这个结果就是目标的分区好。这样的做的好处是同 key 的消息会被分配到同一个分区,保证同 key 消息的顺序。
      • 未指定分区且消息无 key,生产者采用轮询(Round-Robin)策略: 生产者会在可用分区中轮询发送消息,实现负载均衡。在实现中,生产者会维护一个 nextPartition,每次发送消息后递增,再与分区数取模运得到分区号。
      分区可用性感知:生产者会定期获取主题的元数据,若某个分区的 Leader 不可用(如 broker 宕机),默认分区策略会跳过该分区,将消息发送到其他可用分区(避免消息丢失或发送失败)
      粘性分区策略(Sticky Partioner):减少分区切换带来的网络开销,优先将消息发送到上一次的分区(“粘性”),直到该分区满或发生故障,再切换到其他分区,该策略适用于无 key消息的场景,可减少 TCP 连接的频繁切换,提升发送效率。

      2.1.4 消息追加到缓冲区(RecordAccumulator)

       
      消息的序列化和分区计算完成后,会追加RecordAccumulator(消息收集器)中。RecordAccumulator充当的角色是一个“队列”,间隔一段时间的消息拼凑成一个批次(ProducerBatch)再去发送可以减少网络请求的次数以提审整体的吞吐量。
      RecordAccumulator 内部维护了一个缓冲池,缓冲池的大小由配置由 buffer.memory 决定(默认为 32MB)。一条消息追加到RecordAccmulator时,会先去寻找与消息分区对应的分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),判断当前消息是否可以写入ProucerBatch中,如果不能写入则需要创建一个新的 ProducerBatch。新建 ProducerBatch时会评估这条消息的大小是否超过了配置参数 batch.size 参数的大小,如果超过了会以消息的size从 BuffPool 申请内存空间,不过这段内存区域不会被复用。不超过则以batch.size 参数大小来创建 ProducerBatch。
       
      消息处理完成后,不是立即发送的(如果配置linger.ms 为 0,则视为立即发送)。
      ProducerRecord消息这里做的消息的批次处理,构建 ProducerBatch 批次的具体的结构为KV 结构,key 为分区号,value 为批次数据。,用于缓存临时数据。为了提升性能可以将消息按分区结构,组成批次数据,交给 Sender 线程将消息组成的批次数据发送给 Broker。
      发送的消息会根据分区号进行批次构建或追加,消息能进入同一个 ProdcerBatch 依据两个配置 batch.sizelinger.ms ,batch.size 当前的批次大小linger.ms 等待时长。消息会首先尝试加入现有的批次中,如果批次已满(≥batchSize)追加不成功,则会申请新的批次加入到新的批次当中。将消息追加成功后,将唤醒 Sender 线程发送批次消息。
       
      kafka 源码中有关于这块的描述 生产者会将两次请求发送间隔内到达的所有记录合并为一个批量请求
      通常情况下,这种行为仅在系统处于高负载时发生(即记录到达速度快于发送速度)。但在某些场景下,即使处于中等负载,客户端也可能需要主动降低请求数量。该机制通过引入少量人为延迟实现——生产者不会立即发送记录,而是等待指定时长(允许其他记录加入发送队列),从而将多条记录合并发送。此机制类似于 TCP 协议中的Nagle 算法

2.1.5 Send 线程发送消息

 
send 线程唤醒以后,会获取已就绪的批次消息,然后发送到 Borker。总结流程如下
Send 线程从 RecordAccmulator 中获取缓存消息(ProducerBatch)之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转换成<Node,List<ProducerBatch>>形式,Node 表示 Kafka 集群中的broker 节点。对于网络连接来说,生产者客户端是与具体的 Broker 节点建立连接,也就是向具体的 broker 节点发送消息,而不关心消息属于哪一个分区。
转换成<Node,List<ProducerBatch>>的形式之后,Sender 线程还会再进一步的封装<Node,Request>的形式,这样就可以将 Request请求发送到各个 Node了。
请求从 Sender 线程发往 Broker 之前还会保存到 InFlightRequests 中,InFlightRequests 保存的具体形式为KV 结构Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去了但是还没有接受到响应的请求(NodeId 为 String,表示节点的 Id 编号)。
InFlightRequests 提供了许多 API,并且可以通过配置参数可以限制每个连接(客户端与 Node之间的连接)最多缓存的请求数),默认为 5,即每个连接最多缓存 5 个未响应的请求,超过该值之后不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过 Deque<Request>的 size 大小来判断对应的 Node 是否已经堆积了很多未响应的消息,如果size>0,说明这个 Node 节点的负载较大或网络连接有问题,再继续向其发送请求会增大超时的可能。
Sender 线程的核心代码链路为:
KafkaProducer 构造函数 → Sender.run() → runOnce() → sendProducerData() → sendProduceRequest() → NetworkClient.send()
发送消息流程图示
发送消息流程图示
 
上一篇
Kafka 一、基本介绍
下一篇
Kafka 三、主题与分区

Comments
Loading...