高吞吐 Kafka 客户端配置
写在前面的话
团队围绕着 Kafka 一系列服务的压力测试的核心指标之一是 Eevents per second,排除其它因素,它与吞吐量正相关。
什么是吞吐量
吞吐量是指通过通信通道成功传递消息的速率。从 Kafka 生产者(Producer)的角度来看,可以用单位时间内交付成功的记录条数或大小来描述吞吐量;从 Kafka 消费者(Consumer)的角度来看,可以用单位时间内读取的记录条数或大小来描述。
生产者准备记录包含业务逻辑,此处只关注单位时间内接收到回执(ack)的记录条数或大小。默认情况下,生产者使用异步发送,且它将尝试在内存中累积记录并在单个请求中发送批量记录。
for (long i = 0; i < numRecords; i++) {
producer.send(record, callback);
}
调用方法 KafkaProducer.send(ProducerRecord, Callback) 不阻塞在该方法,既传递记录(record)的引用,也传递回调(callback)的引用。
class MyCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// handling
}
}
生产者接受到发送记录的回执时,将有线程执行方法 onCompletion 的代码,我们可以在回调方法体内实现记录条数或大小的累计。
注意:“提交偏移量”不一定发生在“处理记录”之后。
while (conditions) {
ConsumerRecords records = consumer.poll(timeout);
processRetrievedRecords(records);
}
消费者处理记录包含业务逻辑,此处只关注单位时间内接受到的记录条数或大小。默认情况下,消费者通过轮询拉取(poll)批量的记录,每次调用方法 KafkaConsumer.poll(Duration) 阻塞在该方法,直到超时(timeout)返回,且会有线程定期透明地提交前一批的最后一条记录的偏移量(auto commit)。
测试吞吐量
生产者工具
在 Kafka 安装目录,有助于测试生产者性能的命令行工具是 kafka-producer-perf-test.sh。
% ./bin/kafka-producer-perf-test.sh --help
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]] [--producer.config CONFIG-FILE]
[--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE)
This tool is used to verify the producer performance.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if --payload-file is not provided. (default: \n)
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.servers,client.id etc. These configs take precedence over those passed via --producer.config.
--producer.config CONFIG-FILE
producer config properties file.
--print-metrics print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)
either --record-size or --payload-file must be specified but not both.
--record-size RECORD-SIZE
message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
--payload-file PAYLOAD-FILE
file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be read from this file and a payload will be randomly selected when sending messages. Note that you must provide
exactly one of --record-size or --payload-file.
查看 kafka-producer-perf-test.sh 不难发现实际执行 ProducerPerformance 的方法 main。
% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092
发送 1000000 条大小为 1024 字节的记录到地址为 localhost:9092 的 Kafka Broker 的主题 perf_test,输出包含 records/sec 或 MB/sec 是我们关注的吞吐量。
498505 records sent, 99701.0 records/sec (97.36 MB/sec), 1.4 ms avg latency, 176.0 ms max latency.
1000000 records sent, 102722.136620 records/sec (100.31 MB/sec), 0.92 ms avg latency, 176.00 ms max latency, 0 ms 50th, 1 ms 95th, 23 ms 99th, 56 ms 99.9th.
消费者工具
另一方面,命令行工具 kafka-consumer-perf-test.sh 有助于对消费者进行性能测试。
% ./bin/kafka-consumer-perf-test.sh --help
This tool helps in performance test for the full zookeeper consumer
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--consumer.config <String: config file> Consumer config properties file.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
(default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size> The amount of data to fetch in a
single request. (default: 1048576)
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.
--group <String: gid> The group id to consume on. (default:
perf-consumer-20714)
--help Print usage information.
--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of
fetcher threads. (default: 1)
--print-metrics Print out the metrics.
--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info. (default: 5000)
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 2097152)
--threads <Integer: count> DEPRECATED AND IGNORED: Number of
processing threads. (default: 10)
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. (default: 10000)
--topic <String: topic> REQUIRED: The topic to consume from.
--version Display Kafka version.
查看 kafka-consumer-perf-test.sh 不难发现实际执行 ConsumerPerformance 的方法 main。
./bin/kafka-consumer-perf-test.sh \
--broker-list localhost:9092 \
--topic perf_test \
--messages 1000000
从地址是 localhost:9092 的 Kafka Broker 的主题 perf_test 索取 1000000 条记录,输出结果包含列名 nMsg.sec 是我们关注的吞吐量。
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-07-02 11:39:47:948, 2022-07-02 11:39:49:374, 976.9287, 685.0832, 1000375, 701525.2454, 187, 1239, 788.4816, 807405.1655
理论配置
完全受到 Summary of Configurations for Optimizing Throughput 的启发。
生产者配置
batch.size
。批次大小。批处理时(当多个记录被发送到同一个分区时)累积记录不超过指定字节数,超过时立即发送批量记录。建议增加到 100000 ~ 200000。默认为 16384。linger.ms
。逗留时长。批处理时(当多个记录被发送到同一个分区时)累积记录不超过指定毫秒数,超过时立即发送批量记录。属性 linger.ms 优先于属性 batch.size。默认为 0。compression.type=lz4
。压缩类型。默认值是 none,表示不压缩。acks=1
。生产者要求在 Leader 考虑请求完成之前收到的 ack 数量。默认值是 all,Kafka 3.0 之前的默认值是 1。buffer.memory
。缓冲记录的内存总字节数。如果有很多分区就增加它,默认为 33554432。
消费者配置
fetch.min.bytes
。一个索取请求(fetch request)要求服务端返回的最小字节数。增加到 100000,默认为 1。fetch.max.wait.ms=500
。一个索取请求(fetch request)要求服务端未累积到 fetch.min.bytes 的最大阻塞毫秒数。
验证配置
% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092 \
batch.size=100000 \
linger.ms=10 \
compression.type=lz4 \
acks=1 \
buffer.memory=33554432
489179 records sent, 97835.8 records/sec (95.54 MB/sec), 3.0 ms avg latency, 421.0 ms max latency.
1000000 records sent, 102637.791235 records/sec (100.23 MB/sec), 2.22 ms avg latency, 421.00 ms max latency, 1 ms 50th, 5 ms 95th, 39 ms 99th, 90 ms 99.9th.
上面的配置使用 lz4 压缩算法,而下面的配置不使用压缩在某些指标表现得更好。
% ./bin/kafka-producer-perf-test.sh \
--topic perf_test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props \
bootstrap.servers=localhost:9092 \
batch.size=100000 \
linger.ms=10 \
acks=1 \
buffer.memory=33554432
558625 records sent, 111680.3 records/sec (109.06 MB/sec), 1.8 ms avg latency, 197.0 ms max latency.
1000000 records sent, 114207.400640 records/sec (111.53 MB/sec), 1.87 ms avg latency, 197.00 ms max latency, 1 ms 50th, 5 ms 95th, 20 ms 99th, 48 ms 99.9th.
亲自走“调参-测试”循环,方可迭代到更适合自己的最佳实践。
本文首发于 https://h2cone.github.io/