Kafka中的消费者是如何接收消息的?

Kafka中的消费者通过Consumer API从broker接收消息。

接收消息的主要流程是:

  1. 创建消费者对象,指定broker地址和订阅主题:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "group1"); 
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
  1. 指定消费位置,从该位置开始消费:
consumer.seek(new TopicPartition("topic1", 0), 10);  
  1. 消费消息,并在回调方法中处理:
consumer.poll(Duration.ofMillis(100)) 
.records(new ConsumerRecords<String, String>() {
    @Override
    public void forEach(ConsumerRecord<String, String> record) {
        System.out.println(record.key() + " " + record.value());
    }
});
  1. 定期提交消费进度offset:
consumer.commitSync();
  1. 关闭消费者对象:
consumer.close();

消费者主要通过subscribe方法订阅主题,poll方法消费消息,commitSync方法提交offset。

消费者会拉取订阅主题未消费的消息,并在poll方法的回调中处理。处理完成后需要提交offset,通知broker已消费到哪个位置。

消费者还需要指定组ID,具有相同组ID的消费者会负载消费主题中的消息,形成一个消费组。

以上就是Kafka消费者接收消息的主要流程和API。消费者可以轻松订阅消息并处理,Kafka保证消息的有序传输和exactly-once语义。