Redis is often thought of as a “data structure server”, providing a network interface to a few simple data structure primitives. Streams are the first major new general-purpose data structure since Redis introduced sorted sets many years ago. Let’s take a look at one of the major uses for this new structure: modeling time series data.
Redis Streams represent an append-only timeseries of key-value pairs.
Any number of clients can write to a Stream and each time they do they receive a unique ascending ID of the item that was inserted into the timeseries.
Clients reading data can block while listening for new data coming in, can maintain a “bookmark” of the last message read for batch processing, or can be organized into more complex “consumer groups” for sharing workloads and acknowledging messages.
Streams are a large topic, so this post will just look briefly at how to use them to model timeseries data in Redis. The new data structure makes this dramatically more simple than previous approaches like using List or Sorted Set types to model timeseries.
For the code examples here we’ll use redis-rb:
require "redis"
redis = Redis.new
Streams are append-only, so usually the only details you need are the name of Redis key that you want to write to, and your set of key-value pairs.
I’m going to record measurements from my air quality sensor, sending a stream
of key-value pairs to the site:pdx
key - specifically I’m sending the current
Air Quality Index and the temperature in Celsius:
id = redis.xadd("site:pdx", "*",
"aqi", 37,
"tempc", 5.1)
=> "1527974818120-0"
I send an XADD
command whenever there is a new measurement to be recorded.
The response is a unique, always-ascending ID that can be used in queries.
The first part of the ID, 1527974818120
is the timestamp assigned by the
Redis server. The second part of the ID is an incrementing number to avoid
collisions when multiple clients may write at the same time.
Supplying a *
as the second argument in the write command, as shown above,
lets Redis know to store the data with its own
timestamp
It’s also possible to provide a timestamp when writing data, but is usually not preferred: letting Redis choose the timestamp allows many clients to write data to a single Stream simultaneously without the clients needing to coordinate about ID selection and ordering – the Redis server just handles those details for the clients.
Once a few values are in the stream you can retrieve a range of values by supplying a range of IDs or timestamps. This could be used by an app displaying the most recent readings of my air quality metrics on a graph:
resp = redis.xrange("site:pdx",
"1527974818120-0",
"+",
"COUNT", 5)
# resp now holds 5 readings, pass them to the open graph:
=> [["1543947167906-0", ["aqi", "31", "tempc", "5.1"]],
["1543947168312-0", [ "aqi", "31", "tempc", "5.3"]],
["1543947168901-0", ["aqi", "31", "tempc", "5.4"]],
["1543947170033-0", ["aqi", "31", "tempc", "5.4"]],
["1543947171460-0", ["aqi", "31", "tempc", "5.6"]]]
Ranges of numbers can be sampled from anywhere in the Stream, allowing a graphing system to query historical data without performance penalties.
Querying ranges of data is useful for graphs and historical monitoring, but
sometimes you want to build a system that can respond immediately to data as
it comes in - Redis streams are a great fit for this also, using XREAD
:
resp = redis.xread("BLOCK", 10000,
"STREAMS", "site:pdx", "$")
# close the windows if aqi > 50
A blocking operation like this will wait until data comes in, or until the
timeout (1000ms here), so to maintain a continual poll you block as in the
example above until data is available, and simply call that same XREAD
command again each time after data is received or the command timesout.
To ensure no data is missed between connections to your stream, you can supply the last ID read off of your stream when reading from it, and that way you will pick up exactly where you left off.
In the case above I just wanted whatever data came in on the stream so used the special token ‘$’ to indicate “new data only”; the command looks only slightly different with a supplied ID:
redis.xread("BLOCK", 10000,
"STREAMS", "site:pdx", "1543947171460-0");
You can also read across many different streams at once, returning values from the first stream that accepts data:
redis.xread("BLOCK", 10000,
"STREAMS",
"site:pdx", "site:global",
"1543947171460-0", "$");
In this example the command will return when any data more recent than ID
1543947171460-0
is written on site:pdx
, or when any new data is written
on site:global
.
Once you have a lot of data flowing through a stream, you may want to have multiple copies of consumers processing the inbound messages. These consumers should pick up a message, take an action on it and then “acknowledge” that the work has been done. Redis streams provide primitives for these operations as well - too much to get into here but see the documentation for details.
There are a lot of new Redis commands available for streams! Here’s a quick reference, you can find more detail in the official documentation.
Simple commands:
Consumer group commands:
The most commonly-cited use case for Streams is IoT workloads where sensors put data on a stream for consumption by consumers to use in many ways (analysis, archival to cold storage, display on graphs). Streams with capped size are a great use case here, allowing you to allocate a fixed size stream with a predictable memory footprint.
Streams are also suitable for use in applications that previously used other Redis data structures. Queueing applications like Celery and Sidekiq could advantage of Streams’ consumer groups to provide inspection of read receipts in a Redis-native way. There are lots of blog posts demonstrating simple chat apps using Redis pubsub which could be made more robust with Redis Streams, since pubsub does not retain messages after they are published to clients.
You can try Redis streams on a new Memetria server with just a few clicks.