[toc]

生产者

就是往Kafka中写入数据的应用程序,Kafka自己提供了生产者,也有很多第三方实现的客户端(我们当然以python为例)

概览

创建生产者

1
2
3
4
5
6
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
KafkaProducer(
bootstrap_servers=['{}:{}'.format(KAFKA_HOST, KAFKA_PORT)],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)
  • bootstrap_servers
  • key_serializer
  • value_serializer

发送消息

发送的流程:消息先被放入缓冲区,然后使用单独的线程发送到服务器端,send()会返回一个包含RecordMetadata的Future对象

不用考虑发送消息时可能发生的错误或者服务器端可能发生的错误,需要考虑序列化异常、缓冲区溢出或者发送线程被中断的异常

同步发送

producer.send(topic, message, partition=0).get()

  • 可重试的异常,比如连接错误(再次建立连接)、“no leader”通过重新为分区选举首领解决,Kafka producer可以配置成自动重试,多次重试后仍无法解决问题程序会收到一个重试异常
  • 无法通过重试解决的问题,比如消息太大,这样producer会直接抛出异常

异步消息发送

1
2
3
4
5
6
7
producer.send(topic, data_info, partition=partition).add_errback(on_send_error).add_callback(on_send_success)

def on_send_error(excp):
pass

def on_send_success(record_metadata):
pass
  • 有一个配置可以指定,生产者在没有收到成功回调之前最多能发送多少个消息

生产者的配置

已经介绍了一些重要的需要定制化的配置,还有一些作为调优,但通常使用默认值即可的配置也介绍一下

  • acks:指定必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
    • =0 生产者完全不对结果进行确认,会出现消息丢失,但是效率最高
    • =1 但集群首领收到消息,生产者会收到来自服务器的成功响应,如果首领没有收到消息(首领节点崩溃新首领还没有选举出来),生产者会收到一个错误响应,为避免消息丢失,生产者会重发消息,这个时候客户端同步调用的话就会产生延迟,如果是回调方式可以缓解延迟问题
    • =all 所有参与复制的节点全部收到消息才会返回成功,效率最低
  • buffer.memory:生产者内存缓冲区的大小,如果应用程序发送消息的速度超过了发送到服务器的速度,会导致这个缓冲占满,这个时候会根据block.on.buffer.full决定是立即抛出异常还是把send()进行阻塞(一段时间) 0.9.0.0版本里变成max.block.ms表示抛出异常前阻塞的时间
  • compression.type:指定消息压缩的方法,snappy、gzip,snappy性能和效率比较平衡,gzip比较占用CPU压缩效率也更高
  • retries:生产者收到可重试的错误时,重试的次数(默认情况下,重试的时间间隔为100ms)
  • batch.size:多条消息需要呗发送到同一个分区的时候会被放到同一个批次里面,此参数配置一个批次的内存大小(字节,不是消息数量)
  • linger.ms:一个批次最长的等待时间,到了时间即使空间没有满也会发送
  • client.id:任意字符串,服务器用它来识别消息的来源
  • max.in.flight.requests.per.connection:指定生产者在收到服务器响应之前可以发送多少个消息,如果设置为1,==那么回调方式也变为了同步发送==,但可以保证消息时按照发送的顺序写入服务器的
  • timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms:生产者发送数据时等待服务器响应的时间
  • max.block.ms:上面说过,缓冲区满了之后send()阻塞的时间
  • max.request.size:控制生产者发送请求大小
  • receive.buffer.bytes 和 send.buffer.bytes:tcp socket接收和发送数据包的缓冲区,-1的时候使用操作系统的默认值,如果客户端和服务器之间处于不同的数据中心,可以适当增大这个值

补充说明:Kafka可以保证统一额分区里的消息时有序的,也就是说如果生产者按照一定的顺序发送消息,broker就会按序写入分区,如果retries不为0,同时max.in.flight.requests.per.connection也大于1的话,第一个批次消息写入失败,第二个批次写入成功,第一个批次重试成功,那么顺序就反了,如果严格要求有序,那么建议retries还是不要为0,把max.in.flight.requests.per.connection设为1,不过这样会严重影响生产者的吞吐量

序列化器

这里讲了自定义和Avro序列化器的大致逻辑,但是没用过没看出什么东西,后面用到了再看吧

分区

1
2
3
4
5
6
# 方式一,显式的制定分区
producer.send(topic, value, partition=random.randint(0, 4)).add_errback(on_send_error)
# 方式二,通过key取hash进行分区映射
producer.send(topic=topic, key='device_name', value=value).add_errback(on_send_error)
# 方式三,随机分配
producer.send(topic=topic, value=value).add_errback(on_send_error)
  • Kafka如果指定了key且使用了默认的区分器的话,就会根据key进行hash映射到分区中,也就是同一个key会一直被放到同一个分区,且这里的散列算法是Kafka的,和调用的语言无关,当然如果分区数量发生了改变散列相应改变(telemetry项目中我手动去实现了这个功能,其实毫无必要)
  • 如果没有指定key,分区器会使用轮询算法将消息均衡的分布到各个分区
  • 也可以去实现自定义分区器,telemetry那块也算吧,不过没有调用官方的接口是通过业务侧完成的