Redis 8 Streams and Time Series
Redis 8 streams time series capabilities make Redis a compelling choice for real-time data pipelines. Redis Streams provide a durable, append-only log structure similar to Kafka but with sub-millisecond latency. Combined with the TimeSeries module, you can ingest, process, and query time-stamped data entirely within Redis.
This guide covers production patterns for building real-time pipelines — from event ingestion with consumer groups to windowed aggregations and alerting. If your use case demands sub-millisecond latency and you are already running Redis, streams and time series eliminate the need for separate message brokers and TSDB systems.
Redis Streams Fundamentals
A Redis Stream is an append-only log where each entry has an auto-generated ID (timestamp-based) and a set of field-value pairs. Consumer groups enable multiple consumers to process the stream in parallel with exactly-once semantics.
# Add events to a stream
XADD sensor:readings * device_id sensor-42 temperature 23.5 humidity 68.2
XADD sensor:readings * device_id sensor-43 temperature 24.1 humidity 65.8
# Read latest entries
XRANGE sensor:readings - + COUNT 10
# Create a consumer group
XGROUP CREATE sensor:readings processing-group $ MKSTREAM
# Read as consumer in group (blocks until data available)
XREADGROUP GROUP processing-group consumer-1 COUNT 10 BLOCK 5000 STREAMS sensor:readings >
# Acknowledge processed messages
XACK sensor:readings processing-group 1679012345678-0
Building a Consumer Group Pipeline
import redis.asyncio as redis
import asyncio
import json
class StreamProcessor:
def __init__(self, redis_url: str, stream: str, group: str, consumer: str):
self.client = redis.from_url(redis_url)
self.stream = stream
self.group = group
self.consumer = consumer
async def setup(self):
try:
await self.client.xgroup_create(self.stream, self.group, id="$", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
async def process(self):
await self.setup()
while True:
# Read new messages (> means undelivered only)
messages = await self.client.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=100,
block=5000,
)
if not messages:
continue
for stream_name, entries in messages:
for entry_id, data in entries:
try:
await self.handle_event(entry_id, data)
await self.client.xack(self.stream, self.group, entry_id)
except Exception as e:
print(f"Error processing {entry_id}: {e}")
# Message stays pending, will be reclaimed
async def handle_event(self, entry_id: str, data: dict):
device_id = data.get(b"device_id", b"").decode()
temperature = float(data.get(b"temperature", 0))
# Store in time series for aggregation
await self.client.execute_command(
"TS.ADD", f"ts:temp:{device_id}",
"*", temperature,
"RETENTION", 86400000,
"LABELS", "device", device_id, "metric", "temperature"
)
# Alert on threshold
if temperature > 35.0:
await self.client.publish("alerts", json.dumps({
"device": device_id,
"temperature": temperature,
"severity": "high"
}))
# Run multiple consumers for horizontal scaling
async def main():
processor = StreamProcessor(
"redis://localhost:6379",
"sensor:readings",
"processing-group",
f"consumer-{os.getpid()}"
)
await processor.process()
Time Series Aggregations
# Create time series with retention and labels
TS.CREATE ts:temp:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric temperature
TS.CREATE ts:humidity:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric humidity
# Add data points
TS.ADD ts:temp:sensor-42 * 23.5
TS.ADD ts:temp:sensor-42 * 24.1
# Create aggregation rules (automatic downsampling)
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:avg_5m AGGREGATION avg 300000
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:max_1h AGGREGATION max 3600000
# Query with aggregation
TS.RANGE ts:temp:sensor-42 - + AGGREGATION avg 60000 # 1-minute averages
TS.MRANGE - + FILTER metric=temperature AGGREGATION avg 300000 # All devices
Windowed Queries for Dashboards
# Real-time dashboard data
async def get_dashboard_data(device_id: str):
now = int(time.time() * 1000)
hour_ago = now - 3600000
# Last hour, 1-minute resolution
temps = await redis_client.execute_command(
"TS.RANGE", f"ts:temp:{device_id}",
hour_ago, now,
"AGGREGATION", "avg", 60000
)
# Multi-device comparison
all_temps = await redis_client.execute_command(
"TS.MRANGE", hour_ago, now,
"AGGREGATION", "avg", 300000,
"FILTER", "metric=temperature"
)
return {"device": temps, "all_devices": all_temps}
When NOT to Use Redis Streams
Redis Streams have limitations compared to Kafka. Therefore, avoid them when you need: multi-datacenter replication with ordering guarantees, message retention beyond RAM capacity, schema evolution and exactly-once processing across systems, or throughput above 500K messages per second. As a result, Redis Streams are ideal for low-latency pipelines under moderate volume, while Kafka handles planetary-scale event streaming.
Key Takeaways
Redis 8 streams combined with time series provide a complete real-time data pipeline within a single system. Consumer groups enable horizontal scaling. Automatic aggregation rules handle downsampling. For use cases requiring sub-millisecond latency at moderate volume, Redis eliminates the complexity of running separate message brokers and time series databases.