高吞吐 Kafka 客户端配置

Posted on Jun 20, 2022

写在前面的话

团队围绕着 Kafka 一系列服务的压力测试的核心指标之一是 Eevents per second,排除其它因素,它与吞吐量正相关。

什么是吞吐量

吞吐量是指通过通信通道成功传递消息的速率。从 Kafka 生产者(Producer)的角度来看,可以用单位时间内交付成功的记录条数或大小来描述吞吐量;从 Kafka 消费者(Consumer)的角度来看,可以用单位时间内读取的记录条数或大小来描述。

producer-broker

生产者准备记录包含业务逻辑,此处只关注单位时间内接收到回执(ack)的记录条数或大小。默认情况下,生产者使用异步发送,且它将尝试在内存中累积记录并在单个请求中发送批量记录。

for (long i = 0; i < numRecords; i++) {
    producer.send(record, callback);
}

调用方法 KafkaProducer.send(ProducerRecord, Callback) 不阻塞在该方法,既传递记录(record)的引用,也传递回调(callback)的引用。

class MyCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        // handling
    }
}

生产者接受到发送记录的回执时,将有线程执行方法 onCompletion 的代码,我们可以在回调方法体内实现记录条数或大小的累计。

consumer-broker

注意:“提交偏移量”不一定发生在“处理记录”之后。

while (conditions) {
    ConsumerRecords records = consumer.poll(timeout);
    processRetrievedRecords(records);
}

消费者处理记录包含业务逻辑,此处只关注单位时间内接受到的记录条数或大小。默认情况下,消费者通过轮询拉取(poll)批量的记录,每次调用方法 KafkaConsumer.poll(Duration) 阻塞在该方法,直到超时(timeout)返回,且会有线程定期透明地提交前一批的最后一条记录的偏移量(auto commit)。

测试吞吐量

生产者工具

在 Kafka 安装目录,有助于测试生产者性能的命令行工具是 kafka-producer-perf-test.sh。

% ./bin/kafka-producer-perf-test.sh --help
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]] [--producer.config CONFIG-FILE]
                            [--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE)

This tool is used to verify the producer performance.

optional arguments:
  -h, --help             show this help message and exit
  --topic TOPIC          produce messages to this topic
  --num-records NUM-RECORDS
                         number of messages to produce
  --payload-delimiter PAYLOAD-DELIMITER
                         provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if --payload-file is not provided. (default: \n)
  --throughput THROUGHPUT
                         throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.
  --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
                         kafka producer related configuration properties like bootstrap.servers,client.id etc. These configs take precedence over those passed via --producer.config.
  --producer.config CONFIG-FILE
                         producer config properties file.
  --print-metrics        print out metrics at the end of the test. (default: false)
  --transactional-id TRANSACTIONAL-ID
                         The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id)
  --transaction-duration-ms TRANSACTION-DURATION
                         The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)

  either --record-size or --payload-file must be specified but not both.

  --record-size RECORD-SIZE
                         message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
  --payload-file PAYLOAD-FILE
                         file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be  read  from  this file and a payload will be randomly selected when sending messages. Note that you must provide
                         exactly one of --record-size or --payload-file.

查看 kafka-producer-perf-test.sh 不难发现实际执行 ProducerPerformance 的方法 main。

% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092

发送 1000000 条大小为 1024 字节的记录到地址为 localhost:9092 的 Kafka Broker 的主题 perf_test,输出包含 records/sec 或 MB/sec 是我们关注的吞吐量。

498505 records sent, 99701.0 records/sec (97.36 MB/sec), 1.4 ms avg latency, 176.0 ms max latency.
1000000 records sent, 102722.136620 records/sec (100.31 MB/sec), 0.92 ms avg latency, 176.00 ms max latency, 0 ms 50th, 1 ms 95th, 23 ms 99th, 56 ms 99.9th.

消费者工具

另一方面,命令行工具 kafka-consumer-perf-test.sh 有助于对消费者进行性能测试。

% ./bin/kafka-consumer-perf-test.sh --help
This tool helps in performance test for the full zookeeper consumer
Option                                   Description                            
------                                   -----------                            
--bootstrap-server <String: server to    REQUIRED unless --broker-list          
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to.                   
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     
                                           instead; ignored if --bootstrap-     
                                           server is specified.  The broker     
                                           list string in the form HOST1:PORT1, 
                                           HOST2:PORT2.                         
--consumer.config <String: config file>  Consumer config properties file.       
--date-format <String: date format>      The date format to use for formatting  
                                           the time field. See java.text.       
                                           SimpleDateFormat for options.        
                                           (default: yyyy-MM-dd HH:mm:ss:SSS)   
--fetch-size <Integer: size>             The amount of data to fetch in a       
                                           single request. (default: 1048576)   
--from-latest                            If the consumer does not already have  
                                           an established offset to consume     
                                           from, start with the latest message  
                                           present in the log rather than the   
                                           earliest message.                    
--group <String: gid>                    The group id to consume on. (default:  
                                           perf-consumer-20714)                 
--help                                   Print usage information.               
--hide-header                            If set, skips printing the header for  
                                           the stats                            
--messages <Long: count>                 REQUIRED: The number of messages to    
                                           send or consume                      
--num-fetch-threads <Integer: count>     DEPRECATED AND IGNORED: Number of      
                                           fetcher threads. (default: 1)        
--print-metrics                          Print out the metrics.                 
--reporting-interval <Integer:           Interval in milliseconds at which to   
  interval_ms>                             print progress info. (default: 5000) 
--show-detailed-stats                    If set, stats are reported for each    
                                           reporting interval as configured by  
                                           reporting-interval                   
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.         
                                           (default: 2097152)                   
--threads <Integer: count>               DEPRECATED AND IGNORED: Number of      
                                           processing threads. (default: 10)    
--timeout [Long: milliseconds]           The maximum allowed time in            
                                           milliseconds between returned        
                                           records. (default: 10000)            
--topic <String: topic>                  REQUIRED: The topic to consume from.   
--version                                Display Kafka version.  

查看 kafka-consumer-perf-test.sh 不难发现实际执行 ConsumerPerformance 的方法 main。

./bin/kafka-consumer-perf-test.sh \
--broker-list localhost:9092 \
--topic perf_test \
--messages 1000000

从地址是 localhost:9092 的 Kafka Broker 的主题 perf_test 索取 1000000 条记录,输出结果包含列名 nMsg.sec 是我们关注的吞吐量。

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-07-02 11:39:47:948, 2022-07-02 11:39:49:374, 976.9287, 685.0832, 1000375, 701525.2454, 187, 1239, 788.4816, 807405.1655

理论配置

完全受到 Summary of Configurations for Optimizing Throughput 的启发。

生产者配置

  • batch.size。批次大小。批处理时(当多个记录被发送到同一个分区时)累积记录不超过指定字节数,超过时立即发送批量记录。建议增加到 100000 ~ 200000。默认为 16384。
  • linger.ms。逗留时长。批处理时(当多个记录被发送到同一个分区时)累积记录不超过指定毫秒数,超过时立即发送批量记录。属性 linger.ms 优先于属性 batch.size。默认为 0。
  • compression.type=lz4。压缩类型。默认值是 none,表示不压缩。
  • acks=1。生产者要求在 Leader 考虑请求完成之前收到的 ack 数量。默认值是 all,Kafka 3.0 之前的默认值是 1。
  • buffer.memory。缓冲记录的内存总字节数。如果有很多分区就增加它,默认为 33554432。

消费者配置

  • fetch.min.bytes。一个索取请求(fetch request)要求服务端返回的最小字节数。增加到 100000,默认为 1。
  • fetch.max.wait.ms=500。一个索取请求(fetch request)要求服务端未累积到 fetch.min.bytes 的最大阻塞毫秒数。

验证配置

% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092 \
batch.size=100000 \
linger.ms=10 \
compression.type=lz4 \
acks=1 \
buffer.memory=33554432
489179 records sent, 97835.8 records/sec (95.54 MB/sec), 3.0 ms avg latency, 421.0 ms max latency.
1000000 records sent, 102637.791235 records/sec (100.23 MB/sec), 2.22 ms avg latency, 421.00 ms max latency, 1 ms 50th, 5 ms 95th, 39 ms 99th, 90 ms 99.9th.

上面的配置使用 lz4 压缩算法,而下面的配置不使用压缩在某些指标表现得更好。

% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092 \
batch.size=100000 \
linger.ms=10 \
acks=1 \
buffer.memory=33554432
558625 records sent, 111680.3 records/sec (109.06 MB/sec), 1.8 ms avg latency, 197.0 ms max latency.
1000000 records sent, 114207.400640 records/sec (111.53 MB/sec), 1.87 ms avg latency, 197.00 ms max latency, 1 ms 50th, 5 ms 95th, 20 ms 99th, 48 ms 99.9th.

亲自走“调参-测试”循环,方可迭代到更适合自己的最佳实践。

本文首发于 https://h2cone.github.io/

参考