|
For the latest stable version, please use Korvet 0.12.5! |
Authentication
This guide covers configuring SASL authentication to secure access to Korvet.
Overview
Korvet supports SASL (Simple Authentication and Security Layer) authentication to control access to the Kafka protocol endpoint. When enabled, clients must authenticate before producing or consuming messages.
Managing Credentials
Credential Storage
Credentials are stored in Redis using secure PBKDF2 password hashing:
-
Algorithm: PBKDF2WithHmacSHA256
-
Iterations: 10,000
-
Salt: 128-bit random per credential
-
Hash: 256-bit output
Creating User Credentials
Use the Korvet admin API or Redis CLI to create user credentials.
Using Redis CLI
# Store a credential for user "alice" in tenant "tenant1"
redis-cli HSET korvet:credentials:alice \
mechanism PLAIN \
password_hash <base64-encoded-hash> \
salt <base64-encoded-salt> \
iterations 10000 \
tenant_id tenant1
Programmatic Creation
// Create credential store
RedisCredentialStore credentialStore =
new RedisCredentialStore(redisClient, "korvet");
PasswordHasher passwordHasher = new PasswordHasher();
// Hash the password
PasswordHasher.HashedPassword hashed =
passwordHasher.hashPassword("secret-password");
// Store the credential
StoredCredential credential = StoredCredential.builder()
.username("alice")
.mechanism("PLAIN")
.passwordHash(hashed.getHash())
.salt(hashed.getSalt())
.iterations(hashed.getIterations())
.tenantId("tenant1")
.build();
credentialStore.storeCredential(credential);
Updating Credentials
To update a user’s password, store a new credential with the same username:
// Hash new password
PasswordHasher.HashedPassword newHashed =
passwordHasher.hashPassword("new-password");
// Update credential
StoredCredential updated = StoredCredential.builder()
.username("alice")
.mechanism("PLAIN")
.passwordHash(newHashed.getHash())
.salt(newHashed.getSalt())
.iterations(newHashed.getIterations())
.tenantId("tenant1")
.build();
credentialStore.storeCredential(updated);
Client Configuration
Kafka Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// SASL configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"alice\" " +
"password=\"secret-password\";");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Kafka Consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// SASL configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"alice\" " +
"password=\"secret-password\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Command Line Tools
# kafka-console-producer
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic test \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=PLAIN \
--producer-property 'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="secret-password";'
# kafka-console-consumer
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="secret-password";'
Multi-Tenancy
Each credential is associated with a tenant ID. When a client authenticates, the tenant ID is attached to the connection and can be used for:
-
Data isolation - Separate topics and consumer groups per tenant
-
Resource quotas - Limit resources per tenant
-
Access control - Restrict access to tenant-specific resources
Tenant Mapping
// User "alice" belongs to "tenant1"
StoredCredential credential = StoredCredential.builder()
.username("alice")
.tenantId("tenant1")
// ... other fields
.build();
// User "bob" belongs to "tenant2"
StoredCredential credential2 = StoredCredential.builder()
.username("bob")
.tenantId("tenant2")
// ... other fields
.build();
Security Best Practices
Password Security
-
Use strong, randomly generated passwords
-
Rotate passwords regularly
-
Never commit passwords to version control
-
Use environment variables or secret management systems
Troubleshooting
Authentication Failures
Invalid Credentials
ERROR Authentication failed for user 'alice': Invalid password
Solution: Verify the username and password are correct.
Migration Guide
Enabling Authentication on Existing Deployment
| Enabling authentication will break existing unauthenticated clients. |
-
Create credentials for all existing applications
-
Update client configurations with SASL settings
-
Test authentication with a subset of clients
-
Enable SASL in Korvet configuration
-
Monitor for authentication failures
-
Update remaining clients
Future Enhancements
The following features are planned for future releases:
-
SASL/SCRAM-SHA-256 - Full stateful implementation
-
ACL support - Fine-grained authorization per topic/consumer group
-
LDAP/Active Directory - External authentication providers
-
OAuth 2.0 - Token-based authentication
-
Credential rotation - Automated password rotation
-
Audit logging - Detailed authentication and authorization logs