This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Consuming Messages

This guide shows how to consume messages from Korvet using Kafka clients.

Using kafka-console-consumer

The simplest way to consume messages:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic my-topic \
  --from-beginning

Java Consumer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n",
                         record.offset(), record.key(), record.value());
    }
}

Python Consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers='localhost:9092',
    group_id='my-consumer-group',
    auto_offset_reset='earliest'
)

for message in consumer:
    print(f"Offset: {message.offset}, Value: {message.value}")

Offset Management

Korvet tracks consumer offsets to ensure messages are not lost or duplicated:

  • Auto-commit: Offsets are automatically committed periodically

  • Manual commit: You can control when offsets are committed

Reading from Specific Offset

You can start reading from a specific offset:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partition 0 \
  --offset 100