Authentication

This guide covers configuring SASL authentication to secure access to Korvet.

Overview

Korvet supports SASL (Simple Authentication and Security Layer) authentication to control access to the Kafka protocol endpoint. When enabled, clients must authenticate before producing or consuming messages.

Supported Mechanisms

  • SASL/PLAIN - Username and password authentication

Enabling Authentication

Configuration

Enable SASL authentication in your Korvet configuration:

korvet:
  sasl:
    enabled: true

Or using environment variables:

KORVET_SASL_ENABLED=true

Managing Credentials

Credential Storage

Credentials are stored in Redis using secure PBKDF2 password hashing:

  • Algorithm: PBKDF2WithHmacSHA256

  • Iterations: 10,000

  • Salt: 128-bit random per credential

  • Hash: 256-bit output

Creating User Credentials

Use the Korvet admin API or Redis CLI to create user credentials.

Using Redis CLI

# Store a credential for user "alice" in tenant "tenant1"
redis-cli HSET korvet:credentials:alice \
  mechanism PLAIN \
  password_hash <base64-encoded-hash> \
  salt <base64-encoded-salt> \
  iterations 10000 \
  tenant_id tenant1

Programmatic Creation

// Create credential store
RedisCredentialStore credentialStore =
    new RedisCredentialStore(redisClient, "korvet");
PasswordHasher passwordHasher = new PasswordHasher();

// Hash the password
PasswordHasher.HashedPassword hashed =
    passwordHasher.hashPassword("secret-password");

// Store the credential
StoredCredential credential = StoredCredential.builder()
    .username("alice")
    .mechanism("PLAIN")
    .passwordHash(hashed.getHash())
    .salt(hashed.getSalt())
    .iterations(hashed.getIterations())
    .tenantId("tenant1")
    .build();

credentialStore.storeCredential(credential);

Updating Credentials

To update a user’s password, store a new credential with the same username:

// Hash new password
PasswordHasher.HashedPassword newHashed =
    passwordHasher.hashPassword("new-password");

// Update credential
StoredCredential updated = StoredCredential.builder()
    .username("alice")
    .mechanism("PLAIN")
    .passwordHash(newHashed.getHash())
    .salt(newHashed.getSalt())
    .iterations(newHashed.getIterations())
    .tenantId("tenant1")
    .build();

credentialStore.storeCredential(updated);

Deleting Credentials

credentialStore.deleteCredential("alice");

Or using Redis CLI:

redis-cli DEL korvet:credentials:alice

Client Configuration

Kafka Producer

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    StringSerializer.class.getName());

// SASL configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"alice\" " +
    "password=\"secret-password\";");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka Consumer

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());

// SASL configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"alice\" " +
    "password=\"secret-password\";");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Command Line Tools

# kafka-console-producer
kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic test \
  --producer-property security.protocol=SASL_PLAINTEXT \
  --producer-property sasl.mechanism=PLAIN \
  --producer-property 'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="secret-password";'
# kafka-console-consumer
kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic test \
  --from-beginning \
  --consumer-property security.protocol=SASL_PLAINTEXT \
  --consumer-property sasl.mechanism=PLAIN \
  --consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="secret-password";'

Multi-Tenancy

Each credential is associated with a tenant ID. When a client authenticates, the tenant ID is attached to the connection and can be used for:

  • Data isolation - Separate topics and consumer groups per tenant

  • Resource quotas - Limit resources per tenant

  • Access control - Restrict access to tenant-specific resources

Tenant Mapping

// User "alice" belongs to "tenant1"
StoredCredential credential = StoredCredential.builder()
    .username("alice")
    .tenantId("tenant1")
    // ... other fields
    .build();

// User "bob" belongs to "tenant2"
StoredCredential credential2 = StoredCredential.builder()
    .username("bob")
    .tenantId("tenant2")
    // ... other fields
    .build();

Security Best Practices

Password Security

  • Use strong, randomly generated passwords

  • Rotate passwords regularly

  • Never commit passwords to version control

  • Use environment variables or secret management systems

Network Security

SASL/PLAIN transmits credentials in base64 encoding (not encrypted). For production:

  • Use TLS - Combine with SASL_SSL security protocol

  • Network isolation - Deploy in private networks

  • Firewall rules - Restrict access to Korvet port

Credential Management

  • Principle of least privilege - Create separate credentials per application

  • Audit access - Monitor authentication attempts

  • Revoke unused credentials - Delete credentials for decommissioned applications

Troubleshooting

Authentication Failures

Invalid Credentials

ERROR Authentication failed for user 'alice': Invalid password

Solution: Verify the username and password are correct.

User Not Found

ERROR Authentication failed for user 'bob': User not found

Solution: Create the credential using the credential store.

Mechanism Not Supported

ERROR Unsupported SASL mechanism: SCRAM-SHA-512

Solution: Use a supported mechanism (PLAIN).

Connection Issues

Client Configuration

Verify the client is configured with:

  • security.protocol=SASL_PLAINTEXT (or SASL_SSL with TLS)

  • sasl.mechanism=PLAIN

  • Correct JAAS configuration with username and password

Server Configuration

Verify SASL is enabled in Korvet:

# Check environment variable
echo $KORVET_SASL_ENABLED

# Should output: true

Debugging

Enable debug logging for authentication:

logging:
  level:
    com.redis.korvet.core.auth: DEBUG
    com.redis.korvet.core.kafka.SaslHandshakeHandler: DEBUG
    com.redis.korvet.core.kafka.SaslAuthenticateHandler: DEBUG

Migration Guide

Enabling Authentication on Existing Deployment

Enabling authentication will break existing unauthenticated clients.
  1. Create credentials for all existing applications

  2. Update client configurations with SASL settings

  3. Test authentication with a subset of clients

  4. Enable SASL in Korvet configuration

  5. Monitor for authentication failures

  6. Update remaining clients

Disabling Authentication

To disable authentication:

korvet:
  sasl:
    enabled: false

Clients can then connect without authentication.