This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Consumer Group Management

This page describes how Korvet handles Kafka consumer group operations.

This page is implementation-oriented and focuses on Kafka-to-Redis mapping details.

Join Consumer Group (Implicit Group Creation)

Kafka Wire Protocol

JoinGroupRequest (API Key: 11)
JoinGroupRequest {
  group_id: "my-group"
  session_timeout_ms: 45000
  rebalance_timeout_ms: 300000
  member_id: ""                    // empty for first join
  group_instance_id: null
  protocol_type: "consumer"
  protocols: [
    {
      name: "range"                // partition assignment strategy
      metadata: {
        version: 1
        topics: ["orders"]
        user_data: null
      }
    }
  ]
}
JoinGroupResponse (API Key: 11)
JoinGroupResponse {
  throttle_time_ms: 0
  error_code: 0
  generation_id: 1
  protocol_name: "range"
  leader: "consumer-1-uuid"        // first member becomes leader
  member_id: "consumer-1-uuid"     // assigned by broker
  members: [
    {
      member_id: "consumer-1-uuid"
      group_instance_id: null
      metadata: {
        version: 1
        topics: ["orders"]
        user_data: null
      }
    }
  ]
}

Sequence Diagram

JoinGroupRequest Sequence

Redis Commands Detail

JoinGroup is handled by the broker-side group coordinator and consumer-group registry. Redis stream consumer groups are not created during JoinGroup.

Implementation Notes

  • Implicit Creation: Consumer groups are created automatically on first JoinGroup request

  • Initial Position: Redis consumer groups are created lazily on the first Fetch request (not JoinGroup or SyncGroup)

    • The client determines the initial offset based on:

      • OffsetFetchRequest result (if group has committed offsets)

      • auto.offset.reset configuration (if no committed offset exists)

      • ListOffsets API (to resolve "earliest" or "latest" to actual offset)

    • The client sends this offset in the fetch_offset field of the first FetchRequest

    • The broker creates the Redis consumer group using this offset:

      • XGROUP CREATE

    • The broker tracks which groups have been initialized in memory

    • See Consumer Group Workflow and Consumer Group Initialization for complete details

  • Member ID Assignment:

    • Generate UUID for new members (empty member_id in request)

    • Reuse existing member_id for rejoining members

  • Generation ID:

    • Increments on each rebalance

    • Stored in group metadata

    • Used to detect stale member operations

  • Leader Election:

    • First member to join becomes leader

    • Leader performs partition assignment

    • Stored in group metadata

  • Protocol Selection:

    • Broker selects protocol supported by all members

    • Common protocols: range, roundrobin, sticky

  • Rebalance:

    • Triggered when members join/leave

    • All members must rejoin and get new partition assignments

  • Heartbeat: Members send periodic heartbeats to maintain membership (separate HeartbeatRequest)

  • Session Timeout: Member removed if no heartbeat within session_timeout_ms

Delete Consumer Group

Kafka Wire Protocol

DeleteGroupsRequest (API Key: 42)
DeleteGroupsRequest {
  groups_names: ["my-group", "old-group"]
}
DeleteGroupsResponse (API Key: 42)
DeleteGroupsResponse {
  throttle_time_ms: 0
  results: [
    {
      group_id: "my-group"
      error_code: 0            // NONE
    },
    {
      group_id: "old-group"
      error_code: 69           // GROUP_ID_NOT_FOUND
    }
  ]
}

Sequence Diagram

DeleteGroupsRequest Sequence

Redis Commands Detail

DeleteGroups removes broker-side group state, asks the storage layer to delete group artifacts for known stream keys, and deletes explicit committed offsets for those stream keys.

Implementation Notes

  • Group Doesn’t Exist: Return GROUP_ID_NOT_FOUND (error code 69)

  • Active Members:

    • Cannot delete group with active members

    • Return NON_EMPTY_GROUP (error code 68)

    • Check broker in-memory state for active members

  • Storage Cleanup:

    • Group deletion delegates per-stream cleanup to the storage layer

    • Explicit committed offsets are also deleted from the committed-offset store

  • Cascading Effects:

    • Active consumers will get errors on next operation

    • Consumers should handle UNKNOWN_MEMBER_ID and rejoin

  • Performance: The handler builds a stream-key list from known topics and partitions, then performs storage cleanup asynchronously