Redis 8 Streams and Time Series
Redis 8 streams time series capabilities make Redis a compelling choice for real-time data pipelines. Redis Streams provide a durable, append-only log structure similar to Kafka but with sub-millisecond latency. Combined with the TimeSeries module, you can ingest, process, and query time-stamped data entirely within Redis.
This guide covers production patterns for building real-time pipelines — from event ingestion with consumer groups to windowed aggregations and alerting. If your use case demands sub-millisecond latency and you are already running Redis, streams and time series eliminate the need for separate message brokers and TSDB systems.
Redis Streams Fundamentals
A Redis Stream is an append-only log where each entry has an auto-generated ID (timestamp-based) and a set of field-value pairs. Consumer groups enable multiple consumers to process the stream in parallel with exactly-once semantics.
# Add events to a stream
XADD sensor:readings * device_id sensor-42 temperature 23.5 humidity 68.2
XADD sensor:readings * device_id sensor-43 temperature 24.1 humidity 65.8
# Read latest entries
XRANGE sensor:readings - + COUNT 10
# Create a consumer group
XGROUP CREATE sensor:readings processing-group $ MKSTREAM
# Read as consumer in group (blocks until data available)
XREADGROUP GROUP processing-group consumer-1 COUNT 10 BLOCK 5000 STREAMS sensor:readings >
# Acknowledge processed messages
XACK sensor:readings processing-group 1679012345678-0
Building a Consumer Group Pipeline
import redis.asyncio as redis
import asyncio
import json
class StreamProcessor:
def __init__(self, redis_url: str, stream: str, group: str, consumer: str):
self.client = redis.from_url(redis_url)
self.stream = stream
self.group = group
self.consumer = consumer
async def setup(self):
try:
await self.client.xgroup_create(self.stream, self.group, id="$", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
async def process(self):
await self.setup()
while True:
# Read new messages (> means undelivered only)
messages = await self.client.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=100,
block=5000,
)
if not messages:
continue
for stream_name, entries in messages:
for entry_id, data in entries:
try:
await self.handle_event(entry_id, data)
await self.client.xack(self.stream, self.group, entry_id)
except Exception as e:
print(f"Error processing {entry_id}: {e}")
# Message stays pending, will be reclaimed
async def handle_event(self, entry_id: str, data: dict):
device_id = data.get(b"device_id", b"").decode()
temperature = float(data.get(b"temperature", 0))
# Store in time series for aggregation
await self.client.execute_command(
"TS.ADD", f"ts:temp:{device_id}",
"*", temperature,
"RETENTION", 86400000,
"LABELS", "device", device_id, "metric", "temperature"
)
# Alert on threshold
if temperature > 35.0:
await self.client.publish("alerts", json.dumps({
"device": device_id,
"temperature": temperature,
"severity": "high"
}))
# Run multiple consumers for horizontal scaling
async def main():
processor = StreamProcessor(
"redis://localhost:6379",
"sensor:readings",
"processing-group",
f"consumer-{os.getpid()}"
)
await processor.process()
Time Series Aggregations
# Create time series with retention and labels
TS.CREATE ts:temp:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric temperature
TS.CREATE ts:humidity:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric humidity
# Add data points
TS.ADD ts:temp:sensor-42 * 23.5
TS.ADD ts:temp:sensor-42 * 24.1
# Create aggregation rules (automatic downsampling)
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:avg_5m AGGREGATION avg 300000
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:max_1h AGGREGATION max 3600000
# Query with aggregation
TS.RANGE ts:temp:sensor-42 - + AGGREGATION avg 60000 # 1-minute averages
TS.MRANGE - + FILTER metric=temperature AGGREGATION avg 300000 # All devices
Windowed Queries for Dashboards
# Real-time dashboard data
async def get_dashboard_data(device_id: str):
now = int(time.time() * 1000)
hour_ago = now - 3600000
# Last hour, 1-minute resolution
temps = await redis_client.execute_command(
"TS.RANGE", f"ts:temp:{device_id}",
hour_ago, now,
"AGGREGATION", "avg", 60000
)
# Multi-device comparison
all_temps = await redis_client.execute_command(
"TS.MRANGE", hour_ago, now,
"AGGREGATION", "avg", 300000,
"FILTER", "metric=temperature"
)
return {"device": temps, "all_devices": all_temps}
When NOT to Use Redis Streams
Redis Streams have limitations compared to Kafka. Therefore, avoid them when you need: multi-datacenter replication with ordering guarantees, message retention beyond RAM capacity, schema evolution and exactly-once processing across systems, or throughput above 500K messages per second. As a result, Redis Streams are ideal for low-latency pipelines under moderate volume, while Kafka handles planetary-scale event streaming.
Key Takeaways
Redis 8 streams combined with time series provide a complete real-time data pipeline within a single system. Consumer groups enable horizontal scaling. Automatic aggregation rules handle downsampling. For use cases requiring sub-millisecond latency at moderate volume, Redis eliminates the complexity of running separate message brokers and time series databases.
Related Reading
External Resources
In conclusion, Redis 8 Streams Time is an essential topic for modern software development. By applying the patterns and practices covered in this guide, you can build more robust, scalable, and maintainable systems. Start with the fundamentals, iterate on your implementation, and continuously measure results to ensure you are getting the most value from these approaches.