生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。
一、生产者
1、acks:Producer需要Leader确认的Producer请求的应答数。
(1)acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
(2)acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
(3)acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。
2、buffer.memory:该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432即32MB。
3、compression.type:压缩器,目前支持none(不压缩),gzip,snAppy和lz4。
4、retries:Producer发送消息失败重试的次数。重试时Producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下Producer会尝试重新发送。
5、batch.size:默认值为16KB,Producer按照batch进行发送,当batch满了后,Producer会把消息发送出去。
6、linger.ms:Producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。为了减少了网络IO,提升整体的性能。建议设置5-100ms。
二、Broker
1、replica.lag.time.max.ms:ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。
2、auto.leader.rebalance.enable:默认是true。自动Leader Partition 平衡。
3、leader.imbalance.per.broker.percentage:默认是10%。每个Broker允许的不平衡的Leader的比率。如果每个Broker超过了这个值,控制器会触发Leader的平衡。
4、leader.imbalance.check.interval.seconds:默认值300秒。检查Leader负载是否平衡的间隔时间。
5、log.segment.bytes:Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。
6、log.index.interval.bytes:默认4KB,Kafka里面每当写入了4KB大小的日志(.log),然后就往index文件里面记录一个索引。
7、log.retention.hours:Kafka中数据保存的时间,默认7天。
8、log.retention.minutes:Kafka中数据保存的时间,分钟级别,默认关闭。
9、log.retention.ms:Kafka中数据保存的时间,毫秒级别,默认关闭。
10、log.retention.check.interval.ms:检查数据是否保存超时的间隔,默认是5分钟。
11、log.retention.bytes:默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。
12、log.cleanup.policy:默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。
13、num.io.threads:默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。
14、num.replica.fetchers:副本拉取线程数,这个参数占总核数的50%的1/3。
15、num.NETwork.threads:默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。
16、log.flush.interval.messages:强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
17、log.flush.interval.ms:每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。
三、消费者
1、bootstrap.servers:向Kafka集群建立初始连接用到的host/port列表。
2、key.deserializer和value.deserializer:指定接收消息的key和value的反序列化类型。一定要写全类名。
3、group.id:标记消费者所属的消费者组。
4、enable.auto.commit:默认值为true,消费者会自动周期性地向服务器提交偏移量。
5、auto.commit.interval.ms:如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
6、auto.offset.reset:当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?
(1)earliest:自动重置偏移量到最早的偏移量。
(2)latest:默认,自动重置偏移量为最新的偏移量。
(3)none:如果消费组原来的偏移量不存在,则向消费者抛异常。
7、offsets.topic.num.partitions:__consumer_offsets的分区数,默认是50个分区。
8、heartbeat.interval.ms:Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
9、session.timeout.ms:Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
10、max.poll.interval.ms:消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
11、fetch.min.bytes:默认1个字节。消费者获取服务器端一批消息最小的字节数。
12、fetch.max.wAIt.ms:默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
13、fetch.max.bytes:默认值: 52428800字节,即50MB。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
14、max.poll.records:一次poll拉取数据返回消息的最大条数,默认500条。
四、总结
本文总结了Kafka参数,包含了Producer、Broker和Consumer的参数,并且给出了调优Kafka的关键参数配置,可以直接用于生产环境。