跳转至

librdkafka配置参数整理

全局配置属性

属性 C/P 范围 默认值 描述
builtin.features * gzip, snappy, ssl,sasl, regex, lz4 标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性。 Type: CSV flags
client.id * rdkafka 客户端标示。Type: string
metadata.broker.list * 初始化的broker列表。应用程序也可以使用 rd_kafka_brokers_add() 在运行时添加 broker。 Type: string
*bootstrap.servers* * 参考 metadata.broker.list
message.max.bytes * 1000 .. 1000000000 1000000 最大发送消息大小。 Type: integer
message.copy.max.bytes * 0 .. 1000000000 65535 消息拷贝到缓存的最大大小。如果消息大于这个值,将会消耗更多的iovec而采用引用(零拷贝)方式。 Type: integer
receive.message.max.bytes * 1000 .. 1000000000 100000000 最大接收消息大小。这是一个安全预防措施,防止协议饱和时内存耗尽。这个值至少为 fetch.message.max.bytes * 消费者分区数 + 消息头大小 (e.g. 200000 bytes). Type: integer
max.in.flight.requests.per.connection * 1 .. 1000000 1000000 客户端保持的最大发送请求数。 该配置应用于每一个 broker 连接. Type: integer
metadata.request.timeout.ms * 10 .. 900000 60000 无数据请求超时时间,毫秒。 适用于 metadata 请求等。 Type: integer
topic.metadata.refresh.interval.ms * -1 .. 3600000 300000 Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。 Type: integer
metadata.max.age.ms * 参考 topic.metadata.refresh.interval.ms
topic.metadata.refresh.fast.cnt * 0 .. 1000 10 当 topic 丢失 leader, metadata 请求的发送次数,发送间隔是 topic.metadata.refresh.fast.interval.ms而不是 topic.metadata.refresh.interval.ms。 该配置用于快速修复broker leader。 Type: integer
topic.metadata.refresh.fast.interval.ms * 1 .. 60000 250 参考 topic.metadata.refresh.fast.cntType: integer
topic.metadata.refresh.sparse * true, false true 极少的 metadata 请求 (消费者的网络带宽很小) Type: boolean
topic.blacklist * Topic 黑名单,逗号分隔的正则表达式列表,匹配topic名字,匹配到的 topic 如果不存在,就在 broker metadata 信息中忽略。 Type: pattern list
debug * generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all 逗号分隔的列表,控制 debug 上下文。调试生产者:broker,topic,msg。调试消费者:cgrp,topic,fetch Type: CSV flags
socket.timeout.ms * 10 .. 300000 60000 网络请求超时时间。 Type: integer
socket.blocking.max.ms * 1 .. 60000 100 broker 在 socket 操作时最大阻塞时间。值越低,响应越快,但会略微提高CPU使用率。 Type: integer
socket.send.buffer.bytes * 0 .. 100000000 0 Broker socket 发送缓冲大小。系统默认为 0。 Type: integer
socket.receive.buffer.bytes * 0 .. 100000000 0 Broker socket 接收缓冲大小。系统默认为 0。 Type: integer
socket.keepalive.enable * true, false false Broker sockets 允许 TCP 保持活力 (SO_KEEPALIVE)。 Type: boolean
socket.max.fails * 0 .. 1000000 3 Broker 关闭连接的最大错误次数(e.g., timed out requests)。0不关闭。提示:连接自动重新建立。 Type: integer
broker.address.ttl * 0 .. 86400000 1000 保存 broker 地址响应结果的时间 (毫秒)。 Type: integer
broker.address.family * any, v4, v6 any 允许的 broker IP 地址族:any, v4, v6。 Type: enum value
reconnect.backoff.jitter.ms * 0 .. 3600000 500 通过这个值调节 broker 重连尝试 +-50%。 Type: integer
statistics.interval.ms * 0 .. 86400000 0 librdkafka 统计间隔。应用程序需要通过 rd_kafka_conf_set_stats_cb()设置统计的回调函数。粒度是 1000ms. 0 关闭统计。 Type: integer
enabled_events * 0 .. 2147483647 0 参考 rd_kafka_conf_set_events()Type: integer
error_cb * 错误回调函数 (参考 rd_kafka_conf_set_error_cb()) Type: pointer
throttle_cb * 调节回调函数 (参考 rd_kafka_conf_set_throttle_cb()) Type: pointer
stats_cb * 统计回调函数 (参考 rd_kafka_conf_set_stats_cb()) Type: pointer
log_cb * 日志回调函数 (参考 rd_kafka_conf_set_log_cb()) Type: pointer
log_level * 0 .. 7 6 日志界别 (syslog(3) levels) Type: integer
log.thread.name * true, false false 在日志消息中打印内部线程名。(useful for debugging librdkafka internals) Type: boolean
log.connection.close * true, false true 记录 broker 断开连接。由于受 0.9 版本 broker 的 connection.max.idle.ms 的影响,最好关闭。 Type: boolean
socket_cb * 为Socket创建回调函数提供无缝 CLOEXEC Type: pointer
open_cb * 为文件打开回调函数提供无缝 CLOEXEC Type: pointer
opaque * 对应用程序不开放 (set with rd_kafka_conf_set_opaque()) Type: pointer
default_topic_conf * 默认 topic 配置,用于自动订阅 topics Type: pointer
internal.termination.signal * 0 .. 128 0 用于 librdkafka 调用 rd_kafka_destroy() 快速终止的信号。如果没有设置信号, 终止过程会延迟直到所有内部线程的系统调用超时返回,且 rd_kafka_wait_destroyed() 返回 true。如果设置了信号,延迟会最小化。应用程序需要屏蔽该信号,而作为内部信号句柄。 Type: integer
api.version.request * true, false false 请求 broker 支持的API版本,调整可用协议特性的功能。如果设置为false,将使用 broker.version.fallback设置的回退版本。 提示: 依赖的 broker 版本 >=0.10.0。如果 broker(老版本)不支持该请求,使用 broker.version.fallback设置的回退版本。 Type: boolean
api.version.fallback.ms * 0 .. 604800000 1200000 配置 ApiVersionRequest 失败多长时间后,使用 broker.version.fallback 回退版本。提示: ApiVersionRequest 只用新的 broker 能使用。 Type: integer
broker.version.fallback * 0.9.0 老版本的 broker(<0.10.0)不支持客户端查询支持协议特性(ApiVersionRequest, see api.version.request),所以要客户端不知道什么特性可以使用。 用户使用本属性指示 broker 版本,如果 ApiVersionRequest 失败(或不可用),客户端据此属性自动调整特性。与api.version.fallback.ms 配合使用。有效值:0.9.0, 0.8.2, 0.8.1, 0.8.0. Type: string
security.protocol * plaintext, ssl, sasl_plaintext, sasl_ssl plaintext 与 broker 通讯的协议。 Type: enum value
ssl.cipher.suites * _ 密码套件是个组合体,包括鉴权,加密,认证和秘钥交换程序,用于网络连接的安全设置交换,使用 TLS 或 SSL 网络协议。查看手册 ciphers(1) 和 SSL_CTX_set_cipher_list(3)。 Type: string
ssl.key.location * 客户端的私钥(PEM)路径,用于鉴权。 Type: string
ssl.key.password * 私钥密码。 Type: string
ssl.certificate.location * 客户端的公钥(PEM)路径,用于鉴权。 Type: string
ssl.ca.location * CA 证书文件或路径,用于校验 broker key。 Type: string
ssl.crl.location * CRL 路径,用于 broker 的证书校验。 Type: string
sasl.mechanisms * GSSAPI, PLAIN GSSAPI 使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名。 Type: string
sasl.kerberos.service.name * kafka Kafka 运行的 Kerberos 首要名。 Type: string
sasl.kerberos.principal * kafkaclient 客户端的 Kerberos 首要名。 Type: string
sasl.kerberos.kinit.cmd * kinit -S “%{sasl.kerberos.service.name}/%{broker.name}” -k -t “%{sasl.kerberos.keytab}” %{sasl.kerberos.principal} 完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名。 Type: string
sasl.kerberos.keytab * Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板,如 ... -t %{sasl.kerberos.keytab}Type: string
sasl.kerberos.min.time.before.relogin * 1 .. 86400000 60000 Key 恢复尝试的最小时间,毫秒。 Type: integer
sasl.username * 使用 PLAIN 机制时,SASL 用户名。 Type: string
sasl.password * 使用 PLAIN 机制时,SASL 密码。 Type: string
group.id * 客户端分组字符串。同组的客户端使用相同的 group.id。 Type: string
partition.assignment.strategy * range,roundrobin partition 分配策略,当选举组 leader 时,分配 partition 给组成员的策略。 Type: string
session.timeout.ms * 1 .. 3600000 30000 客户端组会话探测失败超市时间。 Type: integer
heartbeat.interval.ms * 1 .. 3600000 1000 组会话保活心跳间隔。 Type: integer
group.protocol.type * consumer 组协议类型。 Type: string
coordinator.query.interval.ms * 1 .. 3600000 600000 多久查询一次当前的客户端组协调人。如果当前的分配协调人挂了,为了更快的恢复协调人,探测时间间隔会除以 10。 Type: integer
enable.auto.commit C true, false true 在后台周期性的自动提交偏移量。 Type: boolean
auto.commit.interval.ms C 0 .. 86400000 5000 消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) Type: integer
enable.auto.offset.store C true, false true 为应用程序自动保存最后消息的偏移量。 Type: boolean
queued.min.messages C 1 .. 10000000 100000 每一个 topic+partition,本地消费者队列的最小消息数。 Type: integer
queued.max.messages.kbytes C 1 .. 1000000000 1000000 每一个 topic+partition,本地消费者队列的最大大小,单位kilobytes。该值应该大于 fetch.message.max.bytes。 Type: integer
fetch.wait.max.ms C 0 .. 300000 100 为写满fetch.min.bytes,broker 的最大等待时间。 Type: integer
fetch.message.max.bytes C 1 .. 1000000000 1048576 每一个 topic+partition 初始化的最大大小(bytes)用于从 broker 读消息。如果客户端遇到消息大于这个值,会逐步扩大直到塞下这个消息。 Type: integer
max.partition.fetch.bytes C 参考 fetch.message.max.bytes
fetch.min.bytes C 1 .. 100000000 1 broker 请求的最小数据大小,单位bytes。如果达到 fetch.wait.max.ms 时间,则不管这个配置,将已收到的数据发送给客户端。 Type: integer
fetch.error.backoff.ms C 0 .. 300000 500 对于 topic+partition,如果接受错误,下一个接受请求间隔多长时间。 Type: integer
offset.store.method C none, file, broker broker 偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。 Type: enum value
consume_cb C 消息消费回调函数 (参考 rd_kafka_conf_set_consume_cb()) Type: pointer
rebalance_cb C 消费者组重新分配后调用 (参考 rd_kafka_conf_set_rebalance_cb()) Type: pointer
offset_commit_cb C 偏移量提交结果回调函数 (参考 rd_kafka_conf_set_offset_commit_cb()) Type: pointer
enable.partition.eof C true, false true 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件。 Type: boolean
queue.buffering.max.messages P 1 .. 10000000 100000 生产者队列允许的最大消息数。 Type: integer
queue.buffering.max.kbytes P 1 .. 2147483647 4000000 生产者队列允许的最大大小,单位kb。 Type: integer
queue.buffering.max.ms P 1 .. 900000 1000 生产者队列缓存数据的最大时间,毫秒。 Type: integer
message.send.max.retries P 0 .. 10000000 2 消息集发送失败重试次数。提示 重试会导致重排。 Type: integer
retries P 参考 message.send.max.retries
retry.backoff.ms P 1 .. 300000 100 重试消息发送前的补偿时间。 Type: integer
compression.codec P none, gzip, snappy, lz4 none 压缩消息集使用的压缩编解码器。这里配置的是所有 topic 的默认值,可能会被 topic 上的 compression.codec 属性覆盖。 Type: enum value
batch.num.messages P 1 .. 1000000 10000 一个消息集最大打包消息数量。整个消息集的大小仍受限于 message.max.bytesType: integer
delivery.report.only.error P true, false false 只对失败的消息提供分发报告。 Type: boolean
dr_cb P 分发报告回调函数 (参考 rd_kafka_conf_set_dr_cb()) Type: pointer
dr_msg_cb P 分发报告回调函数 (参考 rd_kafka_conf_set_dr_msg_cb()) Type: pointer

Topic 配置属性

属性 C/P 范围 默认值 描述
request.required.acks P -1 .. 1000 1 这个字段标示 leader broker 要从 ISR broker 接收多少个 ack,然后才确认发送请求:0=不发送任何 response/ack 给客户端, 1=只有 leader broker 需要 ack 消息, -1 or all=broker 阻塞直到所有的同步备份(ISRs)或in.sync.replicas设置的备份返回消息提交的确认应答。 Type: integer
acks P 参考 request.required.acks
request.timeout.ms P 1 .. 900000 5000 生产者请求等待应答的超时时间,毫秒。这个值仅在 broker 上强制执行。参考 request.required.acks,不能等于 0. Type: integer
message.timeout.ms P 0 .. 900000 300000 本地消息超时时间。这个值仅在本地强制执行,限制生产的消息等待被成功发送的等待时间,0 是不限制。 Type: integer
produce.offset.report P true, false false 报告生产消息的偏移量给应用程序。应用程序必须使用 dr_msg_cbrd_kafka_message_t.offset 中获取偏移量。 Type: boolean
partitioner_cb P 分区方法回调函数 (参考 rd_kafka_topic_conf_set_partitioner_cb()) Type: pointer
opaque * 应用程序不可见 (参考 rd_kafka_topic_conf_set_opaque()) Type: pointer
compression.codec P none, gzip, snappy, lz4, inherit inherit 压缩消息集的压缩编解码器。 Type: enum value
auto.commit.enable C true, false true 如果是 true,周期性的提交最后一个消息的偏移量。用于当程序重启时抛弃不用的消息。如果是 false,应用程序需要调用 rd_kafka_offset_store() 保存偏移量 (可选).提示 这个属性时能用于简单消费者,high-level KafkaConsumer 会被全局的 enable.auto.commit 属性替代。提示 目前没有整合 zookeeper,根据 offset.store.method 的配置 偏移量将写入 broker 或本地文件 file according to offset.store.method. Type: boolean
enable.auto.commit C 参考 auto.commit.enable
auto.commit.interval.ms C 10 .. 86400000 60000 消费者偏移量提交(写入)到存储的频率,毫秒。 Type: integer
auto.offset.reset C smallest, earliest, beginning, largest, latest, end, error largest 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式:Action to take when there is no initial offset in offset store or the desired offset is out of range:‘smallest’,’earliest’ - 自动重设偏移量为最小偏移量,*’largest’,’latest’ - 自动重设偏移量为最大偏移量,*’error’ - 通过消费消息触发一个错误,请检查 message->errType: enum value
offset.store.path C . 存储偏移量的本地文件路径。如果路径是个目录,在目录下自动创建基于 topic 和 partition 的文件名。 Type: string
offset.store.sync.interval.ms C -1 .. 86400000 -1 偏移量文件 fsync() 的间隔,毫秒。-1 不同步,0 每次写入后立即同步。 Type: integer
offset.store.method C file, broker broker 偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。 Type: enum value
consume.callback.max.messages C 0 .. 1000000 0 一次 rd_kafka_consume_callback*() 调配的最大消息数 (0 = 无限制) Type: integer

C/P 含义:C = 生产者, P = 消费者, * = 二者都有