1. Overview
Redis Kafka Connector is a Confluent-verified connector that stores data from Kafka topics into Redis and pushes data from Redis into Kafka topics.
This guide provides documentation and usage information across the following topics:
2. Quick Start
This section shows how to configure the Redis Kafka Connector to import/export data between Redis and Apache Kafka and provides a hands-on look at the functionality of the source and sink connectors.
2.2. Start the Sandbox
The sandbox starts the following Docker services:
-
Redis Stack
-
Apache Kafka
-
Kafka Connect with the Redis Kafka Connector installed
To start the sandbox run the following command:
docker compose up
After Docker downloads and starts the services you should see the following output:
[+] Running 8/0
✔ Container redis Created
✔ Container zookeeper Created
✔ Container broker Created
✔ Container schema-registry Created
✔ Container rest-proxy Created
✔ Container connect Created
✔ Container ksqldb-server Created
✔ Container control-center Created
2.3. Add Connectors
Now that the required services are up and running, we can add connectors to Kafka Connect to transfer data between Redis and Kafka:
-
Add a sink connector to transfer data from Kafka to Redis
-
Add a source connector to transfer data from Redis to Kafka
2.3.1. Add a Datagen
Kafka Connect Datagen is a Kafka Connect source connector for generating mock data.
Create the Datagen connector with the following command:
curl -X POST -H "Content-Type: application/json" --data '
{ "name": "datagen-pageviews",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
"max.interval": 200,
"iterations": 10000000,
"tasks.max": "1"
}}' http://localhost:8083/connectors -w "\n"
This automatically creates the Kafka topic pageviews
and produces data with a schema configuration from pageviews_schema.avro
Why do I see the message 'Failed to connect'? It takes up to three minutes for the Kafka Connect REST API to start. If you receive the following error, wait three minutes and run the preceding command again.
|
To confirm that you added the Datagen connector, run the following command:
curl -X GET http://localhost:8083/connectors
2.3.2. Add a Sink Connector
The command below adds a Redis Kafka Connector sink connector configured with these properties:
-
The class Kafka Connect uses to instantiate the connector
-
The Kafka topic from which the connector reads data
-
The connection URI of the Redis database to which the connector writes data
-
The Redis command to use for writing data (
JSONSET
) -
Key and value converters to correctly handle incoming
pageviews
data -
A Single Message Transforms to extract a key from
pageviews
messages.
curl -X POST -H "Content-Type: application/json" --data '
{"name": "redis-sink-json",
"config": {
"connector.class":"com.redis.kafka.connect.RedisSinkConnector",
"tasks.max":"1",
"topics":"pageviews",
"redis.uri":"redis://redis:6379",
"redis.command":"JSONSET",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.Cast.spec": "string"
}}' http://localhost:8083/connectors -w "\n"
You can check that Kafka messages are being written to Redis with this command:
docker compose exec redis /opt/redis-stack/bin/redis-cli "keys" "*"
You should see the following output:
1) "pageviews:6021"
2) "pageviews:211"
3) "pageviews:281"
...
To retrieve the contents of a specific key use this command:
docker compose exec redis /opt/redis-stack/bin/redis-cli "JSON.GET" "pageviews:1451"
⇒ "{\"viewtime\":1451,\"userid\":\"User_6\",\"pageid\":\"Page_35\"}"
2.3.3. Add a Source Connector
The following command adds a source connector configured with these properties:
-
The class Kafka Connect uses to instantiate the connector
-
The connection URI of the Redis database the connector connects to
-
The name of the Redis stream from which the connector reads messages
-
The Kafka topic to which the connector writes data
curl -X POST -H "Content-Type: application/json" --data '
{ "name": "redis-source",
"config": {
"tasks.max":"1",
"connector.class":"com.redis.kafka.connect.RedisStreamSourceConnector",
"redis.uri":"redis://redis:6379",
"redis.stream.name":"mystream",
"topic": "mystream"
}
}' http://localhost:8083/connectors -w "\n"
Now add a message to the mystream
Redis stream:
docker compose exec redis /opt/redis-stack/bin/redis-cli "xadd" "mystream" "*" "field1" "value11" "field2" "value21"
Examine the topics in the Kafka UI: http://localhost:9021 or http://localhost:8000/.
The mystream
topic should have the previously sent stream message.
2.4. Custom Connector
This section describes configuration aspects that are specific to using Redis Kafka Connector as a Custom Connector in Confluent Cloud.
2.4.1. Egress Endpoints
It is required to specify egress endpoints in order for the connector to reach the Redis database.
2.4.2. Sensitive Properties
The following are sensitive properties that must be marked as such in Confluent Cloud UI.
-
redis.uri
: URI of the Redis database to connect to, e.g.redis://redis-12000.redis.com:12000
-
redis.username
: Username to use to connect to Redis -
redis.password
: Password to use to connect to Redis -
redis.key.password
: Password of the private key file
2.5. Docker Example
The project repository contains a script that runs all the steps shown in the Quick Start.
Clone the redis-kafka-connect repository and execute run.sh
in docker
directory:
git clone https://github.com/redis-field-engineering/redis-kafka-connect.git
cd redis-kafka-connect
./run.sh
This will:
-
Run
docker compose up
-
Wait for Redis, Kafka, and Kafka Connect to be ready
-
Register the Confluent Datagen Connector
-
Register the Redis Kafka Sink Connector
-
Register the Redis Kafka Source Connector
-
Publish some events to Kafka via the Datagen connector
-
Write the events to Redis
-
Send messages to a Redis stream
-
Write the Redis stream messages back into Kafka
Once running, examine the topics in the Kafka Control Center:
The pageviews
topic should contain the 10 simple documents added, each similar to:
{
"viewtime": {
"$numberLong": "81"
},
"pageid": "Page_1",
"userid": "User_8"
}
-
The
pageviews
stream should contain the 10 change events.
Examine the stream in Redis:
docker compose exec redis /usr/local/bin/redis-cli
xread COUNT 10 STREAMS pageviews 0
Messages added to the mystream
stream will show up in the mystream
topic.
3. Install
Select one of the methods below to install Redis Kafka Connector.
3.1. Download
Download the latest release archive: Releases.
3.2. Confluent Hub
-
Install the Confluent Hub Client
-
Install the redis-kafka-connect using the Confluent Hub Client
3.3. Manually
Follow the instructions in Manually Installing Community Connectors.
4. Sink Connector
The Sink Connector consumes records from a Kafka topic and writes the data to Redis.
4.1. Class Name
The sink connector class name is com.redis.kafka.connect.RedisSinkConnector
.
The corresponding configuration property would be:
connector.class = com.redis.kafka.connect.RedisSinkConnector
4.2. At least once delivery
The Sink Connector guarantees that records from the Kafka topic are delivered at least once.
4.3. Tasks
The Sink Connector supports running one or more tasks.
You can specify the number of tasks with the tasks.max
configuration property.
4.4. Redis Client
This section provides information on Redis client configuration.
4.4.1. Redis URI
Specify the Redis URI in the redis.uri
property, for example:
redis.uri = redis://redis-12000.redis.com:12000
For complete Redis URI syntax see Redis URI Syntax.
TLS connection URIs start with rediss://
.
4.5. Redis Data Structures
The Sink Connector supports the following Redis data-structure types as targets:
-
Collections: stream, list, set, sorted set, time series
Collection keys are generated using the
redis.key
configuration property which may contain${topic}
(default) as a placeholder for the originating topic name.For example with
redis.key = ${topic}
and topicorders
the Redis key isset:orders
. -
For other data-structures the key is in the form
<keyspace>:<record_key>
wherekeyspace
is generated using theredis.key
configuration property like above andrecord_key
is the sink record key.For example with
redis.key = ${topic}
, topicorders
, and sink record key123
the Redis key isorders:123
.
4.5.1. Hash
Use the following properties to write Kafka records as Redis hashes:
redis.command = HSET
key.converter = <string or bytes> (1)
value.converter = <Avro or JSON> (2)
4.5.2. String
Use the following properties to write Kafka records as Redis strings:
redis.command = SET
key.converter = <string or bytes> (1)
value.converter = <string or bytes> (2)
4.5.3. JSON
Use the following properties to write Kafka records as RedisJSON documents:
redis.command = JSONSET
key.converter = <string, bytes, or Avro> (1)
value.converter = <string, bytes, or Avro> (2)
4.5.4. Stream
Use the following properties to store Kafka records as Redis stream messages:
redis.command = XADD
redis.key = <stream key> (1)
value.converter = <Avro or JSON> (2)
1 | Stream key |
2 | Avro or JSON |
4.5.5. List
Use the following properties to add Kafka record keys to a Redis list:
redis.command = <LPUSH or RPUSH> (1)
redis.key = <key name> (2)
key.converter = <string or bytes> (3)
The Kafka record value can be any format. If a value is null then the member is removed from the list (instead of pushed to the list).
4.5.6. Set
Use the following properties to add Kafka record keys to a Redis set:
redis.command = SADD
redis.key = <key name> (1)
key.converter = <string or bytes> (2)
The Kafka record value can be any format. If a value is null then the member is removed from the set (instead of added to the set).
4.5.7. Sorted Set
Use the following properties to add Kafka record keys to a Redis sorted set:
redis.command = ZADD
redis.key = <key name> (1)
key.converter = <string or bytes> (2)
1 | Sorted set key |
2 | String or bytes: Kafka record keys to add to the set |
The Kafka record value should be float64
and is used for the score.
If the score is null then the member is removed from the sorted set (instead of added to the sorted set).
4.5.8. Time Series
Use the following properties to write Kafka records as RedisTimeSeries samples:
redis.command = TSADD
redis.key = <key name> (1)
The Kafka record key must be an integer (e.g. int64
) as it is used for the sample time in milliseconds.
The Kafka record value must be a number (e.g. float64
) as it is used as the sample value.
4.6. Data Formats
The Sink Connector supports different data formats for record keys and values depending on the target Redis data structure.
4.6.1. Kafka Record Key
The Sink Connector expects Kafka record keys in a specific format depending on the configured target Redis data structure:
Target | Record Key | Assigned To |
---|---|---|
Stream |
Any |
None |
Hash |
String |
Key |
String |
Key |
|
List |
Member |
|
Set |
Member |
|
Sorted Set |
Member |
|
JSON |
Key |
|
TimeSeries |
Integer |
Sample time in milliseconds |
4.6.2. Kafka Record Value
Multiple data formats are supported for Kafka record values depending on the configured target Redis data structure. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using Single Message Transforms to convert to a byte array, string, Struct, or map before it is written to Redis.
Target | Record Value | Assigned To |
---|---|---|
Stream |
Message body |
|
Hash |
Fields |
|
String |
Value |
|
List |
Any |
Removal if null |
Set |
Any |
Removal if null |
Sorted Set |
Number |
Score or removal if null |
JSON |
Value |
|
TimeSeries |
Number |
Sample value |
StringConverter
If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:
value.converter = org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter
Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
4.7. Configuration
connector.class = com.redis.kafka.connect.RedisSinkConnector
topics = <Kafka topic> (1)
redis.uri = <Redis URI> (2)
redis.command = <HSET|SET|JSONSET|XADD|RPUSH|SADD|ZADD|TSADD> (3)
key.converter = <Key converter> (4)
value.converter = <Value converter> (5)
1 | Kafka topics to read messsages from. |
2 | Redis URI. |
3 | Redis command. |
4 | Key converter. |
5 | Value converter. |
5. Stream Source Connector
The Stream Source Connector reads from a Redis stream and publishes messages to a Kafka topic.
5.1. Class Name
The Stream Source Connector class name is com.redis.kafka.connect.RedisStreamSourceConnector
.
The corresponding configuration property would be:
connector.class = com.redis.kafka.connect.RedisStreamSourceConnector
5.2. Delivery Guarantees
The Stream Source Connector can be configured to ack stream messages either automatically (at-most-once delivery) or explicitly (at-least-once delivery). The default is at-least-once delivery.
5.3. Tasks
Reading from the stream is done through a consumer group so that multiple instances of the connector configured via the tasks.max
can consume messages in a round-robin fashion.
5.4. Redis Client
This section provides information on Redis client configuration.
5.4.1. Redis URI
Specify the Redis URI in the redis.uri
property, for example:
redis.uri = redis://redis-12000.redis.com:12000
For complete Redis URI syntax see Redis URI Syntax.
TLS connection URIs start with rediss://
.
5.5. Message Schema
5.5.2. Value Schema
The value schema defines the following fields:
Name | Schema | Description |
---|---|---|
id |
STRING |
Stream message ID |
stream |
STRING |
Stream key |
body |
Map of STRING |
Stream message body |
5.5.3. Configuration
connector.class = com.redis.kafka.connect.RedisStreamSourceConnector
redis.stream.name = <name> (1)
redis.stream.offset = <offset> (2)
redis.stream.block = <millis> (3)
redis.stream.consumer.group = <group> (4)
redis.stream.consumer.name = <name> (5)
redis.stream.delivery = <mode> (6)
topic = <name> (7)
1 | Name of the stream to read from. |
2 | Message ID to start reading from (default: 0-0 ). |
3 | Maximum XREAD wait duration in milliseconds (default: 100 ). |
4 | Name of the stream consumer group (default: kafka-consumer-group ). |
5 | Name of the stream consumer (default: consumer-${task} ).
May contain ${task} as a placeholder for the task id.
For example, foo${task} and task 123 ⇒ consumer foo123 . |
6 | Delivery mode: at-least-once , at-most-once (default: at-least-once ). |
7 | Destination topic (default: ${stream} ).
May contain ${stream} as a placeholder for the originating stream name.
For example, redis_${stream} and stream orders ⇒ topic redis_orders . |
6. Keys Source Connector
The Keys Source Connector captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.
Make sure the Redis database has keyspace notifications enabled using notify-keyspace-events = KEA
in redis.conf
or via CONFIG SET
.
For more details see Redis Keyspace Notifications.
6.1. Class Name
The Keys Source Connector class name is com.redis.kafka.connect.RedisKeysSourceConnector
.
The corresponding configuration property would be:
connector.class = com.redis.kafka.connect.RedisKeysSourceConnector
6.2. Delivery Guarantees
The Keys Source Connector does not guarantee data consistency because it relies on Redis keyspace notifications which have no delivery guarantees. It is possible for some notifications to be missed, for example in case of network failures.
Also, depending on the type, size, and rate of change of data structures on the source it is possible the connector cannot keep up with the change stream.
For example if a big set is repeatedly updated the connector will need to read the whole set on each update and transfer it over to the target database.
With a big-enough set the connector could fall behind and the internal queue could fill up leading up to updates being dropped.
Some preliminary sizing using Redis statistics and bigkeys
/memkeys
is recommended.
If you need assistance please contact your Redis account team.
6.3. Tasks
The Keys Source Connector should only be configured with one task as keyspace notifications are broadcast to all listeners and cannot be consumed in a round-robin fashion.
6.4. Redis Client
This section provides information on Redis client configuration.
6.4.1. Redis URI
Specify the Redis URI in the redis.uri
property, for example:
redis.uri = redis://redis-12000.redis.com:12000
For complete Redis URI syntax see Redis URI Syntax.
TLS connection URIs start with rediss://
.
6.5. Configuration
connector.class = com.redis.kafka.connect.RedisKeysSourceConnector
redis.keys.pattern = <glob> (1)
redis.keys.timeout = <millis> (2)
topic = <name> (3)
1 | Key pattern to subscribe to.
This is the key portion of the pattern that will be used to listen to keyspace events.
For example foo:* translates to pubsub channel __keyspace@0__:foo:* and will capture changes to keys foo:1 , foo:2 , etc.
See Redis KEYS for pattern details. |
2 | Idle timeout in millis. Duration after which the connector will stop if no activity is encountered. |
3 | Name of the destination topic. |
7. Resources
7.1. Kafka
- What is Apache Kafka?
- Should You Put Several Event Types in the Same Kafka Topic?
-
https://www.confluent.io/blog/put-several-event-types-kafka-topic/
- Kafka Quickstart
- Console Producer and Consumer Basics
-
https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html
7.2. Kafka Connect
- Introduction to Kafka Connectors
- Kafka Connect Documentation
-
https://docs.confluent.io/platform/current/connect/index.html