Kafka-Config
Consumer
| Name | Description | Default |
|---|---|---|
| bootstrap.servers | 服务地址 | |
| key.deserializer | 消息 Key 的反序列化方式 | |
| value.deserializer | 消息 Value 的反序列化方式 | |
| group.id | 消费组 ID | “” |
| auto.offset.reset | offset 重置方式 - earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 - latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 - none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 - anything else:抛出异常 |
latest |
| enable.auto.commit | offset 提交方式 - true:自动提交 - false:手动提交 |
true |
| max.poll.interval.ms | 使用消费组管理时的的拉取时间间隔 | 300000 |
| max.poll.records | 一次拉取时的最大记录数 | 500 |
| sasl.mechanism | 用于客户端连接的 SASL 机制(一般用 PLAIN) | GSSAPI |
| security.protocol | 连接 broker 的协议 - PLAINTEXT - SSL - SASL_PLAINTEXT - SASL_SSL |
PLAINTEXT |
Spring-Kafka-Listener
| Name | Description | Default |
|---|---|---|
| spring.kafka.listener.ack-mode | 提交 offset 方式 spring-kafak负责提交: - record:每处理一个消息,就提交一次 - batch:将上一次poll得到消息进行提交 - time:达到指定时间间隔,就提交 - count:达到指定次数,就提交 - count_time:达到指定次数和间间隔,就提交 手动提交: - manual:调用后先存放至本地缓存,在下一次poll之前取出批量提交 - manual_immediate:调用后立即提交 |
Authentication using SASL/PLAIN
Broker
在每一个 Kafka broker 的 config 目录中, 添加一个 JAAS 文件:
kafka_server_jaas.conf1
2
3
4
5
6
7KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};将 JAAS 配置文件的路径作为 JVM 的参数, 并传递到每一个 Kafka broker
1
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
在
server.properties中配置 SASL 端口和 SASL 机制
1 | listeners=SASL_SSL://host.name:port |
Consumer
1 | sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ |