1. Introduction
Redis Kafka Connector is used to import and export data between Apache Kafka and Redis.
This guide provides documentation and usage information across the following topics:
2. Install
Select one of the methods below to install Redis Kafka Connector.
2.1. Download
Download the latest release archive from the releases page.
2.2. Confluent Hub
-
Install the Confluent Hub Client
-
Install the redis-kafka-connect using the Confluent Hub Client
2.3. Manually
Follow the instructions in Manually Installing Community Connectors.
3. Connect to Redis
This section provides information on configuring the Redis Kafka Source or Sink Connector.
Specify the Redis URI in the redis.uri
property, for example:
redis.uri=redis://redis-12000.redis.com:12000
For Redis URI syntax see Redis URI Syntax.
TLS connection URIs start with rediss://
.
To disable certificate verification for TLS connections use the following property:
redis.insecure=true
Username and password can be specified in the URI or separately with the following properties:
redis.username=user1
redis.password=pass
4. Sink Connector Guide
The sink connector consumes records from a Kafka topic and writes the data to Redis. It includes the following features:
4.1. At least once delivery
The sink connector guarantees that records from the Kafka topic are delivered at least once.
4.2. Multiple tasks
The sink connector supports running one or more tasks.
You can specify the number of tasks with the tasks.max
configuration property.
4.3. Redis Data Structures
The sink connector supports the following Redis data-structure types as targets:
-
Collections: stream, list, set, sorted set, time series
Collection keys are generated using the
redis.key
configuration property which may contain${topic}
(default) as a placeholder for the originating topic name.For example with
redis.key = ${topic}
and topicorders
the Redis key isset:orders
. -
For other data-structures the key is in the form
<keyspace>:<record_key>
wherekeyspace
is generated using theredis.key
configuration property like above andrecord_key
is the sink record key.For example with
redis.key = ${topic}
, topicorders
, and sink record key123
the Redis key isorders:123
.
4.3.1. Hash
Use the following properties to write Kafka records as Redis hashes:
redis.command=HSET
key.converter=<string or bytes> (1)
value.converter=<Avro or JSON> (2)
4.3.2. String
Use the following properties to write Kafka records as Redis strings:
redis.command=SET
key.converter=<string or bytes> (1)
value.converter=<string or bytes> (2)
4.3.3. JSON
Use the following properties to write Kafka records as RedisJSON documents:
redis.command=JSONSET
key.converter=<string, bytes, or Avro> (1)
value.converter=<string, bytes, or Avro> (2)
4.3.4. Stream
Use the following properties to store Kafka records as Redis stream messages:
redis.command=XADD
redis.key=<stream key> (1)
value.converter=<Avro or JSON> (2)
1 | Stream key |
2 | Avro or JSON |
4.3.5. List
Use the following properties to add Kafka record keys to a Redis list:
redis.command=<LPUSH or RPUSH> (1)
redis.key=<key name> (2)
key.converter=<string or bytes> (3)
The Kafka record value can be any format. If a value is null then the member is removed from the list (instead of pushed to the list).
4.3.6. Set
Use the following properties to add Kafka record keys to a Redis set:
redis.command=SADD
redis.key=<key name> (1)
key.converter=<string or bytes> (2)
The Kafka record value can be any format. If a value is null then the member is removed from the set (instead of added to the set).
4.3.7. Sorted Set
Use the following properties to add Kafka record keys to a Redis sorted set:
redis.command=ZADD
redis.key=<key name> (1)
key.converter=<string or bytes> (2)
1 | Sorted set key |
2 | String or bytes: Kafka record keys to add to the set |
The Kafka record value should be float64
and is used for the score.
If the score is null then the member is removed from the sorted set (instead of added to the sorted set).
4.3.8. Time Series
Use the following properties to write Kafka records as RedisTimeSeries samples:
redis.command=TSADD
redis.key=<key name> (1)
The Kafka record key must be an integer (e.g. int64
) as it is used for the sample time in milliseconds.
The Kafka record value must be a number (e.g. float64
) as it is used as the sample value.
4.4. Data Formats
The sink connector supports different data formats for record keys and values depending on the target Redis data structure.
4.4.1. Kafka Record Keys
The sink connector expects Kafka record keys in a specific format depending on the configured target Redis data structure:
Target | Record Key | Assigned To |
---|---|---|
Stream |
Any |
None |
Hash |
String |
Key |
String |
Key |
|
List |
Member |
|
Set |
Member |
|
Sorted Set |
Member |
|
JSON |
Key |
|
TimeSeries |
Integer |
Sample time in milliseconds |
4.4.2. Kafka Record Values
Multiple data formats are supported for Kafka record values depending on the configured target Redis data structure. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using Single Message Transformations to convert to a byte array, string, Struct, or map before it is written to Redis.
Target | Record Value | Assigned To |
---|---|---|
Stream |
Message body |
|
Hash |
Fields |
|
String |
Value |
|
List |
Any |
Removal if null |
Set |
Any |
Removal if null |
Sorted Set |
Number |
Score or removal if null |
JSON |
Value |
|
TimeSeries |
Number |
Sample value |
StringConverter
If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:
value.converter=org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter
Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
5. Source Connector Guide
Redis Kafka Connector includes 2 source connectors:
5.1. Stream
The stream source connector reads from a Redis stream and publishes messages to a Kafka topic. It includes the following features:
5.1.1. Delivery Guarantees
The stream source connector can be configured to ack stream messages either automatically (at-most-once delivery) or explicitly (at-least-once delivery). The default is at-least-once delivery.
5.1.2. Multiple Tasks
Reading from the stream is done through a consumer group so that multiple instances of the connector configured via the tasks.max
can consume messages in a round-robin fashion.
5.1.4. Configuration
connector.class=com.redis.kafka.connect.RedisStreamSourceConnector
redis.stream.name=<name> (1)
redis.stream.offset=<offset> (2)
redis.stream.block=<millis> (3)
redis.stream.consumer.group=<group> (4)
redis.stream.consumer.name=<name> (5)
redis.stream.delivery=<mode> (6)
topic=<name> (7)
1 | Name of the stream to read from. |
2 | Message ID to start reading from (default: 0-0 ). |
3 | Maximum XREAD wait duration in milliseconds (default: 100 ). |
4 | Name of the stream consumer group (default: kafka-consumer-group ). |
5 | Name of the stream consumer (default: consumer-${task} ).
May contain ${task} as a placeholder for the task id.
For example, foo${task} and task 123 ⇒ consumer foo123 . |
6 | Delivery mode: at-least-once , at-most-once (default: at-least-once ). |
7 | Destination topic (default: ${stream} ).
May contain ${stream} as a placeholder for the originating stream name.
For example, redis_${stream} and stream orders ⇒ topic redis_orders . |
5.2. Keys
The keys source connector captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.
Make sure the Redis database has keyspace notifications enabled using notify-keyspace-events = KA
in redis.conf
or via CONFIG SET
.
For more details see Redis Keyspace Notifications.
The keys source connector does not guarantee data consistency because it relies on Redis keyspace notifications which have no delivery guarantees. It is possible for some notifications to be missed, for example in case of network failures. Also, depending on the type, size, and rate of change of data structures on the source it is possible the source connector cannot keep up with the change stream.
For example if a big set is repeatedly updated the connector will need to read the whole set on each update and transfer it over to the target database.
With a big-enough set the connector could fall behind and the internal queue could fill up leading up to updates being dropped.
Some preliminary sizing using Redis statistics and |
5.2.1. Configuration
connector.class=com.redis.kafka.connect.RedisKeysSourceConnector
redis.keys.pattern=<glob> (1)
redis.keys.timeout=<millis> (2)
topic=<name> (3)
1 | Key pattern to subscribe to.
This is the key portion of the pattern that will be used to listen to keyspace events.
For example foo:* translates to pubsub channel __keyspace@0__:foo:* and will capture changes to keys foo:1 , foo:2 , etc.
See Redis KEYS for pattern details. |
2 | Idle timeout in millis. Duration after which the connector will stop if no activity is encountered. |
3 | Name of the destination topic. |
6. Quick Start with Docker
This section provides a hands-on look at the functionality of the Redis Kafka Source and Sink Connectors:
-
The redis-sink connector reads data from a Kafka topic and writes it to a Redis stream
-
The redis-source connector reads data from a Redis stream and writes it to a Kafka topic
6.2. Run the example
Clone the github repository and execute run.sh
in docker
directory:
git clone https://github.com/redis-field-engineering/redis-kafka-connect.git
./run.sh
This will:
-
Run
docker-compose up
-
Wait for Redis, Kafka, and Kafka Connect to be ready
-
Register the Confluent Datagen Connector
-
Register the Redis Kafka Sink Connector
-
Register the Redis Kafka Source Connector
-
Publish some events to Kafka via the Datagen connector
-
Write the events to Redis
-
Send messages to a Redis stream
-
Write the Redis stream messages back into Kafka
Once running, examine the topics in the Kafka control center: http://localhost:9021/
-
The
pageviews
topic should contain the 10 simple documents added, each similar to:
{
"viewtime": {
"$numberLong": "81"
},
"pageid": "Page_1",
"userid": "User_8"
}
-
The
pageviews
stream should contain the 10 change events.
Examine the stream in Redis:
docker-compose exec redis /usr/local/bin/redis-cli
xread COUNT 10 STREAMS pageviews 0
Messages added to the mystream
stream will show up in the mystream
topic
7. Resources
7.1. Kafka
- What is Apache Kafka?
- Should You Put Several Event Types in the Same Kafka Topic?
-
https://www.confluent.io/blog/put-several-event-types-kafka-topic/
- Kafka Quickstart
- Console Producer and Consumer Basics
-
https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html
7.2. Kafka Connect
- Introduction to Kafka Connectors
- Kafka Connect Documentation
-
https://docs.confluent.io/platform/current/connect/index.html