术语释疑

  • at-least-once
    至少传递一次,如果broker在收到消息后,发送ack之前失败,且生产者设置了重试,这种情况下可能会产生重复的数据。
  • at-most-once
    最多传递一次,如果生产者在发送消息失败后没有设置重试,虽然不会产生重复数据,但有可能丢失数据。
  • exactly-once
    刚好传递一次,即使producer重试发送消息,消息也会保证最多一次地传递给最终consumer。

Producer配置

config current default value proposed default value
enable.idempotence false true
acks 1 all
max.in.flight.requests.per.connection 5 2
retries 0 MAX_INT
transactional.id 根据实际情况自定义

其他的参数比较好理解,max.in.flight.requests.per.connection参数表示限制客户端在单个连接上能够同时发送的未响应请求的个数,是kafka为了提高吞吐量的一个参数。如果要实现Exactly-once语义,需要将值由默认的5设置成2, 主要是因为如果配置成 > 2, kafka将不能保证消息的顺序,而如果设置成1则对kafka的吞吐量有一定影响。

Producer配置代码

producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}

Consumer配置

config current default value proposed default value
isolation read_uhcommitted read_committed
enable.auto.commit false

Consumer示例代码

records = consumer.poll(100);
for(ConsumerRecord record: records){
processor.process(record);
consumer.commitSync();
}

说明

  • commit语句放在process语句前面还是后面取决于业务,如果process已经实现了幂等性,可以将commit放在process后面,且可以在当前所有的records处理完成之后再一起commit;
  • 而如果处理消息没有实现幂等性,且消息多处理一次可能会影响业务的功能(考虑极端情况,在处理的过程中jvm直接被杀掉了),则建议将commit语句放在process语句前面;
  • 如果极端情况下,有一条消息没有处理可能会对业务造成影响,则需要业务实现事务来持久化已经处理成功的offset,在消费者因为异常情况被杀掉再重启之后,消费者可以通过持久化的数据知道需要从哪一条消息开始处理。
参考文献