Redis 8 Streams and Time Series: Building Real-Time Data Pipelines

Redis 8 Streams and Time Series

Redis 8 streams time series capabilities make Redis a compelling choice for real-time data pipelines. Redis Streams provide a durable, append-only log structure similar to Kafka but with sub-millisecond latency. Combined with the TimeSeries module, you can ingest, process, and query time-stamped data entirely within Redis.

This guide covers production patterns for building real-time pipelines — from event ingestion with consumer groups to windowed aggregations and alerting. If your use case demands sub-millisecond latency and you are already running Redis, streams and time series eliminate the need for separate message brokers and TSDB systems.

Redis Streams Fundamentals

A Redis Stream is an append-only log where each entry has an auto-generated ID (timestamp-based) and a set of field-value pairs. Consumer groups enable multiple consumers to process the stream in parallel with exactly-once semantics.

# Add events to a stream
XADD sensor:readings * device_id sensor-42 temperature 23.5 humidity 68.2
XADD sensor:readings * device_id sensor-43 temperature 24.1 humidity 65.8

# Read latest entries
XRANGE sensor:readings - + COUNT 10

# Create a consumer group
XGROUP CREATE sensor:readings processing-group $ MKSTREAM

# Read as consumer in group (blocks until data available)
XREADGROUP GROUP processing-group consumer-1 COUNT 10 BLOCK 5000 STREAMS sensor:readings >

# Acknowledge processed messages
XACK sensor:readings processing-group 1679012345678-0
Redis 8 streams time series architecture
Redis Streams: durable, ordered event log with consumer group processing

Building a Consumer Group Pipeline

import redis.asyncio as redis
import asyncio
import json

class StreamProcessor:
    def __init__(self, redis_url: str, stream: str, group: str, consumer: str):
        self.client = redis.from_url(redis_url)
        self.stream = stream
        self.group = group
        self.consumer = consumer

    async def setup(self):
        try:
            await self.client.xgroup_create(self.stream, self.group, id="$", mkstream=True)
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    async def process(self):
        await self.setup()
        while True:
            # Read new messages (> means undelivered only)
            messages = await self.client.xreadgroup(
                groupname=self.group,
                consumername=self.consumer,
                streams={self.stream: ">"},
                count=100,
                block=5000,
            )

            if not messages:
                continue

            for stream_name, entries in messages:
                for entry_id, data in entries:
                    try:
                        await self.handle_event(entry_id, data)
                        await self.client.xack(self.stream, self.group, entry_id)
                    except Exception as e:
                        print(f"Error processing {entry_id}: {e}")
                        # Message stays pending, will be reclaimed

    async def handle_event(self, entry_id: str, data: dict):
        device_id = data.get(b"device_id", b"").decode()
        temperature = float(data.get(b"temperature", 0))

        # Store in time series for aggregation
        await self.client.execute_command(
            "TS.ADD", f"ts:temp:{device_id}",
            "*", temperature,
            "RETENTION", 86400000,
            "LABELS", "device", device_id, "metric", "temperature"
        )

        # Alert on threshold
        if temperature > 35.0:
            await self.client.publish("alerts", json.dumps({
                "device": device_id,
                "temperature": temperature,
                "severity": "high"
            }))

# Run multiple consumers for horizontal scaling
async def main():
    processor = StreamProcessor(
        "redis://localhost:6379",
        "sensor:readings",
        "processing-group",
        f"consumer-{os.getpid()}"
    )
    await processor.process()

Time Series Aggregations

# Create time series with retention and labels
TS.CREATE ts:temp:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric temperature
TS.CREATE ts:humidity:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric humidity

# Add data points
TS.ADD ts:temp:sensor-42 * 23.5
TS.ADD ts:temp:sensor-42 * 24.1

# Create aggregation rules (automatic downsampling)
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:avg_5m AGGREGATION avg 300000
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:max_1h AGGREGATION max 3600000

# Query with aggregation
TS.RANGE ts:temp:sensor-42 - + AGGREGATION avg 60000    # 1-minute averages
TS.MRANGE - + FILTER metric=temperature AGGREGATION avg 300000  # All devices
Real-time data pipeline monitoring
Real-time aggregations across multiple time series with Redis

Windowed Queries for Dashboards

# Real-time dashboard data
async def get_dashboard_data(device_id: str):
    now = int(time.time() * 1000)
    hour_ago = now - 3600000

    # Last hour, 1-minute resolution
    temps = await redis_client.execute_command(
        "TS.RANGE", f"ts:temp:{device_id}",
        hour_ago, now,
        "AGGREGATION", "avg", 60000
    )

    # Multi-device comparison
    all_temps = await redis_client.execute_command(
        "TS.MRANGE", hour_ago, now,
        "AGGREGATION", "avg", 300000,
        "FILTER", "metric=temperature"
    )

    return {"device": temps, "all_devices": all_temps}

When NOT to Use Redis Streams

Redis Streams have limitations compared to Kafka. Therefore, avoid them when you need: multi-datacenter replication with ordering guarantees, message retention beyond RAM capacity, schema evolution and exactly-once processing across systems, or throughput above 500K messages per second. As a result, Redis Streams are ideal for low-latency pipelines under moderate volume, while Kafka handles planetary-scale event streaming.

Redis monitoring and performance dashboard
Monitoring Redis stream lag, consumer health, and time series ingestion rates

Key Takeaways

Redis 8 streams combined with time series provide a complete real-time data pipeline within a single system. Consumer groups enable horizontal scaling. Automatic aggregation rules handle downsampling. For use cases requiring sub-millisecond latency at moderate volume, Redis eliminates the complexity of running separate message brokers and time series databases.

Related Reading

External Resources

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top