消费者

概念

消费者和群组

  • 可以单独的创建一个消费来对某个主题消息进行消费,但是可能一个消费者速度跟不上

  • 扩展单个消费者为消费者群组,一个群组中的消费者订阅的是同一个主题,Kafka会去控制消费者和分区之间的关联,群组中每个消费者接收主题一部分分区的消息

  • 通过扩展消费者数量来提高消费速度,需要注意不要让消费者数量超过分区,多出来的消费者会被闲置

  • 如上图,两个群组可以互不干扰的消费同一个主题

消费者和分区再均衡

考虑一些操作或者异常情况:

  • 添加新分区
  • 关闭群组中的某个消费者或群组中某个消费者异常消亡

这些情景下会发生分区重分配,分区的所有权从一个消费者转移到另一个消费者,叫做再均衡

  • 消费者通过向群组协调器borker(从broker集群中指派出来的,且不同的群组可以有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系
  • 消费者会在轮询消息或提交偏移量时发送心跳,如果消费者停止发送心跳时间足够长,回话就会过期,协调器会认为它已经死亡,触发一次再均衡
  • 如果一个消费者崩溃,协调器会等待几秒钟确认它死亡了才触发再均衡
  • 如果一个消费者被清理(正常的关闭),它会主动通知协调器,协调器会立即触发一次再均衡
  • 后续的章节会讨论心跳发送频率和回话过期时间该如何结合情景进行配置
  • 分区分配流程:第一个加入群组的消费者会成为群主,群主去获取群成员列表并负责给每个成员分配分区,只有群主能看到完整信息,普通成员只能看到自己的配置

创建Kafka消费者

1
2
3
4
5
6
consumer = KafkaConsumer(
bootstrap_servers='{}:{}'.format(KAFKA_HOST, KAFKA_PORT),
group_id='test',
key_deserializer=lambda k: json.loads(k).encode(),
value_deserializer=lambda v: json.loads(v).encode()
)
  • 只有bootstrap_servers是必要的
  • key_deserializer和value_deserializer可以不设置,后续自己手动处理,不过一般认为是必要参数,把序列化过程标准化

订阅主题

consumer.subscribe(topics=[])

一个消费者可以订阅多个主题

轮询

1
2
3
4
consumer = KafkaConsumer(...)
for message in consumer:
key = message.key
value = message.value

这里再贴一下java是如何轮询的:

Java里面还有一个while(true),但是python挺怪的直接遍历consumer对象即可,后面可以研究一下,从Java可以看出本质是IO多路复用方式进行实现的

关于线程安全:最好是一个消费者一个线程,无法让一个线程运行多个消费者,也不能保证多个线程安全共享一个消费者

消费者的配置

这里我没有处理过真实的消费场景,暂时跳过

提交和偏移量

再均衡监听器

从特定偏移量出开始消费

如何退出

反序列化器

独立消费者

  • 如果一个主题只需要一个消费者就能完成消息的处理,那就没必要加入群组,省去了再均衡之类的操作
  • 消费者可以订阅主题后获得主题下的所有分区,然后把所有分区都订阅一遍
  • 需要注意的是,这样的情况下如果主题添加了新的分区这个消费者是不能自动订阅的
    • 定期的迭代订阅主题的分区
    • 主题添加分区的时候重启消费者服务