[toc]

深入Kafka

这里主要讨论以下三个话题

  • Kafka如何进行复制
  • Kafka如何处理来自生产者和消费者的请求
  • Kafka的存储细节(文件&索引等)

集群成员关系

  • Kafka使用zookeeper来维护集群成员信息
  • 每个broker都有一个唯一ID,不能冲突,broker启动的时候会在zookeeper中创建一个临时节点信息,卡夫卡组件订阅zookeeper来获取broker的状态信息
  • broker停机或者其他异常时,其启动时在zookeeper上创建的临时节点信息会消失,组件也就会被告知该broker被移除
  • 另外如果是正常关闭了broker,那么它的ID还会在别的地方进行存储,如果这个时候再用这个ID去启动一个全新的broker,那它会拥有旧broker的分区和主题

控制器

  • 控制器是第一个加入集群的broker,相比普通成员它会额外负责首领的选举
  • 第一个broker加入集群时在zookeeper中创建临时节点并把自己注册为控制器,之后的节点进来发现有控制器了就不会再注册控制器,如果控制器关闭了,那它对应的临时节点消失,其余的成员会收到通知,第一个尝试注册的broker会成为新的控制器
  • 当控制器发现一个broker离开集群的时候,会为对应的分区(以这个broker为首领的分区)选取新的首领,选取策略很简单,遍历分区副本列表的下一个副本
  • 当控制器发现一个broker加入集群的时候,通过ID判断broker是否包含现有分区的副本,如果有发送通知给这个broker让它从对应的首领那里复制消息

复制

Kafka使用主题来组织数据,主题被分为若干个分区,每个分区有多个副本,本质上是为了达成分布式性能扩展和高可用

  • 首领副本:为了保证一致性,所有生产者和消费者请求都由首领进行处理(没有读写分离策略)

  • 跟随者副本:就是备份,从首领复制消息

  • 跟随者为了同步数据发起的请求和消费者请求是一样的,跟随者发送请求消息1、2、3,在收到这三个请求响应前不会再发送请求,如果跟随者发送了请求4那么首领就知道这个跟随者已经收到了前三个请求的响应,以此首领便知道了跟随者的同步进度

  • 如果一个跟随者10s内没有发起请求或者请求的数据不是最新的,这个跟随者被标记为不同步,失去被选举为首领的资格

  • 除了当前首领,还有一个首选首领,反正就是高级备胎(还有一个配置可以让这个备胎直接转正,当然前提是没被标记为不同步)

处理请求

Kafka使用一个基于TCP的二进制协议,指定了请求消息的格式已经broker如何对请求作出响应

  • broker会运行一个acceptor线程监听端口,会创建一个连接从客户端获取请求消息,并放入请求队列

  • IO线程负责处理请求队列,通常请求有两种类型

    • 生产请求:生产者发送的写入请求

    • 获取请求:消费者发送和跟随者副本发送的读请求

  • 之前讲过,只有首领才能处理请求,如何保证向首领发起请求:

    • Kafka客户端通过一个元数据缓存(类似路由表)来确定该往哪个broker发起请求,这个缓存是会定时更新的
    • 当然也可能在更新之前首领已经发生了变更,请求被发送到一个错误的broker,这个时候会得到一个‘非分区首领’的错误响应,这个时候客户端会先刷新缓存再重试请求

生产请求

之前说过生产请求可以设定acks(0、1、all),来定义消息写入成功策略,首领收到生产请求时会做一些验证

  • 权限校验
  • acks值合法判断
  • 如果acks为all,是否有足够多的副本保证消息的安全写入(什么意思呢,我们可以通过另一个配置来指定副本数量,如果不满足broker可以拒绝处理新消息)

之后消息会被写入本地磁盘,对Linux系统来说会写到文件系统缓存,并不保证合适被刷新到磁盘(mysql也有类似机制,原因是文件系统缓存只有在操作系统挂了才会丢失)

当消息被写入首领分区后,会根据acks来决定何时响应,0、1会立即响应,all会等到别的副本复制完成再响应

获取请求

流程和生产请求很接近

  • 验证主题分区已经偏移量是否存在,已经读取权限

  • 客户端指定broker最多可以从一个分区里返回多少数据,避免大量数据耗尽客户端分配的内存

  • 同样客户端还可以指定数据返回的下限,这样通过消息累积来减少发送次数(当然还有超时策略,避免一直不响应)

  • 另外不是所有数据都可以被消费,必须被足够多的副本复制的消息才认为是安全的才可以被消费,从而避免消费端的不一致(a消费了,首领崩了新的首领上没有这个数据,b无法消费到这个数据)

其他请求

还有一些broker内部使用的请求,这里不展开

物理存储

Kafka的基本存储单元是分区,分区无法在broker上再细分,也无法在一个broker的多个磁盘上再细分

分区分配

考虑一个场景,6个broker,主题下有10个分区,复制系数为3,该如何进行分配,需要满足一下的几个条件

  • 分区在broker见平均分配,也就是每个broker分到5个副本
  • 确保每个副本分布在不同的broker上
  • 如果broker指定了机架信息,尽可能把分区的副本分配到不同的机架上

实现方式:

  • 随机选一个broker比如4,用轮询的方式给每个broker分配分区来确定首领分区位置,也就是0分区的首领为4,1分区的首领为5,2分区的首领为1
  • 轮询的方式来分配副本,0分区的第一个跟随者副本在5,第二个跟随者副本在1
  • 如果配置了机架信息,那就不是按照数字来轮询broker,而是按照交替机架的方式来选择broker
  • 磁盘空间问题,broker的分区分配不会去考虑不同broker之间磁盘可用空间的问题,后续会介绍相关策略

文件管理

保留数据上Kafka的一个基本特征,其实之前就介绍过Kafka的数据过期策略

  • 把分区分为若干个片段,默认每个片段包含1G或者一周的数据
  • 正在被写入的片段叫做活跃片段,活跃片段永远不会被删除
  • 带来的问题上消息的过期时间可能会和预期相差较大

文件格式

Kafka把消息和偏移量保存在文件里,保存的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的,使用了相同的消息格式进行磁盘存储和网络传输,所以Kafka可以使用零复制技术给消费者发送消息,不同加载到缓存(不知道怎么实现的),同时也避免了broker对数据进行解压缩和再压缩

索引

消费者可以从Kafka的任意可用偏移量位置开始读取消息,是因为Kafka为每个分区维护了一个索引,把偏移量映射到片段文件和在文件里的位置

索引也被分成片段,Kafka不维护索引的校验和,出现损坏是会重新生成索引,所以删除索引是安全的

清理

Kafka有一个策略,可以让key只保留最新的消息,比如生产者不断的往key里面set 1、2、3,这个key只会保留最后一个值(不是很理解,其实大多数场景下,过程数据也有意义)

被删除的事件

这又是一个特定场景:当一个生产者将离开系统时,我们希望清理Kafka和数据库中的对应数据,这个时候可以给Kafka对应的key发送一个值为null的消息,之前的值会被清理,这个消息被称为墓碑消息,会保留一段时间(可以配置),这个时候消费者发现这个null就可以去做数据库清理(需要自行确保消费者不会离线导致错过墓碑消息,墓碑消息最终会被Kafka清理)