This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Consumer Workflow (Standalone)

This page describes the complete Kafka API interaction sequence for a standalone consumer (not part of a consumer group).

This workflow is intended for readers who need to understand broker behavior end to end. For task-oriented user guidance, prefer Consuming Messages.

Overview

A typical standalone consumer workflow involves the following API calls:

  1. ApiVersions - Negotiate protocol version with broker

  2. Metadata - Discover topics and partitions

  3. ListOffsets - Find the beginning or end offset of partitions

  4. Fetch - Read messages from topic partitions

API Sequence

1. ApiVersions Request

The client first negotiates the protocol version (same as producer).

ApiVersionsRequest (API Key: 18)
ApiVersionsRequest {
  client_software_name: "apache-kafka-java"
  client_software_version: "3.9.1"
}

See Producer Workflow for response details.

2. Metadata Request

The client discovers available topics and their partitions (same as producer).

MetadataRequest (API Key: 3)
MetadataRequest {
  topics: ["orders"]
  allow_auto_topic_creation: false
}

See Producer Workflow for response details.

3. ListOffsets Request

The client finds the starting offset for consumption (earliest, latest, or specific timestamp).

ListOffsetsRequest (API Key: 2) - Find Earliest
ListOffsetsRequest {
  replica_id: -1  // consumer
  isolation_level: 0  // READ_UNCOMMITTED
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition_index: 0
          timestamp: -2  // EARLIEST
          max_num_offsets: 1
        }
      ]
    }
  ]
}
ListOffsetsResponse
ListOffsetsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition_index: 0
          error_code: 0
          timestamp: 1234567890000
          offset: 0
        }
      ]
    }
  ]
}

Redis Commands

# For EARLIEST (-2): Get first entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including first-entry: ["1234567890123-0", {...}]

# Encode entry ID to Kafka offset (stateless)
# For "1234567890123-0" with sequenceBits=10:
#   offset = (1234567890123 << 10) | 0

# For LATEST (-1): Get last entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including last-entry
# Encode to offset using same formula

4. Fetch Request

The client reads messages from the partition starting at the desired offset.

FetchRequest (API Key: 1)
FetchRequest {
  max_wait_ms: 500
  min_bytes: 1
  max_bytes: 52428800  // 50 MB
  isolation_level: 0  // READ_UNCOMMITTED
  topics: [
    {
      topic: "orders"
      partitions: [
        {
          partition: 0
          fetch_offset: 0
          partition_max_bytes: 1048576  // 1 MB
        }
      ]
    }
  ]
}
FetchResponse
FetchResponse {
  throttle_time_ms: 0
  topics: [
    {
      topic: "orders"
      partitions: [
        {
          partition_index: 0
          error_code: 0
          high_watermark: 42
          last_stable_offset: 42
          records: [
            {
              offset: 0
              timestamp: 1234567890000
              key: "order-123"
              value: {"product": "widget", "quantity": 5}
              headers: [...]
            },
            {
              offset: 1
              timestamp: 1234567891000
              key: "order-124"
              value: {"product": "gadget", "quantity": 3}
              headers: []
            }
          ]
        }
      ]
    }
  ]
}

Redis Commands

# Decode Kafka offset to Redis Stream entry ID (stateless)
# For offset 0 with sequenceBits=10:
#   timestamp = 0 >> 10 = 0
#   sequence = 0 & 1023 = 0
#   entry_id = "0-0"

# Fetch messages from stream
XREAD COUNT 100 STREAMS korvet:stream:orders:0 0-0
# Returns: [
#   ["korvet:stream:orders:0", [
#     ["1234567890123-0", ["key", "order-123", "value", "...", ...]],
#     ["1234567890123-1", ["key", "order-124", "value", "...", ...]]
#   ]]
# ]

Complete Example Flow

Consumer Workflow Sequence

Key Differences from Group Consumer

  • No group coordination: No JoinGroup, SyncGroup, Heartbeat, or LeaveGroup calls

  • No offset commits: Consumer tracks offsets locally (or in external storage)

  • Manual partition assignment: Consumer explicitly chooses which partitions to read

  • No rebalancing: Adding/removing consumers doesn’t trigger partition reassignment