Redis 8 Streams and Time Series: Building Real-Time Data Pipelines

Redis 8 Streams and Time Series

Redis 8 streams time series capabilities make Redis a compelling choice for real-time data pipelines. Redis Streams provide a durable, append-only log structure similar to Kafka but with sub-millisecond latency. Combined with the TimeSeries module, you can ingest, process, and query time-stamped data entirely within Redis.

This guide covers production patterns for building real-time pipelines — from event ingestion with consumer groups to windowed aggregations and alerting. If your use case demands sub-millisecond latency and you are already running Redis, streams and time series eliminate the need for separate message brokers and TSDB systems.

Redis Streams Fundamentals

A Redis Stream is an append-only log where each entry has an auto-generated ID (timestamp-based) and a set of field-value pairs. Consumer groups enable multiple consumers to process the stream in parallel with exactly-once semantics.

# Add events to a stream
XADD sensor:readings * device_id sensor-42 temperature 23.5 humidity 68.2
XADD sensor:readings * device_id sensor-43 temperature 24.1 humidity 65.8

# Read latest entries
XRANGE sensor:readings - + COUNT 10

# Create a consumer group
XGROUP CREATE sensor:readings processing-group $ MKSTREAM

# Read as consumer in group (blocks until data available)
XREADGROUP GROUP processing-group consumer-1 COUNT 10 BLOCK 5000 STREAMS sensor:readings >

# Acknowledge processed messages
XACK sensor:readings processing-group 1679012345678-0
Redis 8 streams time series architecture
Redis Streams: durable, ordered event log with consumer group processing

Building a Consumer Group Pipeline

import redis.asyncio as redis
import asyncio
import json

class StreamProcessor:
    def __init__(self, redis_url: str, stream: str, group: str, consumer: str):
        self.client = redis.from_url(redis_url)
        self.stream = stream
        self.group = group
        self.consumer = consumer

    async def setup(self):
        try:
            await self.client.xgroup_create(self.stream, self.group, id="$", mkstream=True)
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    async def process(self):
        await self.setup()
        while True:
            # Read new messages (> means undelivered only)
            messages = await self.client.xreadgroup(
                groupname=self.group,
                consumername=self.consumer,
                streams={self.stream: ">"},
                count=100,
                block=5000,
            )

            if not messages:
                continue

            for stream_name, entries in messages:
                for entry_id, data in entries:
                    try:
                        await self.handle_event(entry_id, data)
                        await self.client.xack(self.stream, self.group, entry_id)
                    except Exception as e:
                        print(f"Error processing {entry_id}: {e}")
                        # Message stays pending, will be reclaimed

    async def handle_event(self, entry_id: str, data: dict):
        device_id = data.get(b"device_id", b"").decode()
        temperature = float(data.get(b"temperature", 0))

        # Store in time series for aggregation
        await self.client.execute_command(
            "TS.ADD", f"ts:temp:{device_id}",
            "*", temperature,
            "RETENTION", 86400000,
            "LABELS", "device", device_id, "metric", "temperature"
        )

        # Alert on threshold
        if temperature > 35.0:
            await self.client.publish("alerts", json.dumps({
                "device": device_id,
                "temperature": temperature,
                "severity": "high"
            }))

# Run multiple consumers for horizontal scaling
async def main():
    processor = StreamProcessor(
        "redis://localhost:6379",
        "sensor:readings",
        "processing-group",
        f"consumer-{os.getpid()}"
    )
    await processor.process()

Time Series Aggregations

# Create time series with retention and labels
TS.CREATE ts:temp:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric temperature
TS.CREATE ts:humidity:sensor-42 RETENTION 604800000 LABELS device sensor-42 metric humidity

# Add data points
TS.ADD ts:temp:sensor-42 * 23.5
TS.ADD ts:temp:sensor-42 * 24.1

# Create aggregation rules (automatic downsampling)
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:avg_5m AGGREGATION avg 300000
TS.CREATERULE ts:temp:sensor-42 ts:temp:sensor-42:max_1h AGGREGATION max 3600000

# Query with aggregation
TS.RANGE ts:temp:sensor-42 - + AGGREGATION avg 60000    # 1-minute averages
TS.MRANGE - + FILTER metric=temperature AGGREGATION avg 300000  # All devices
Real-time data pipeline monitoring
Real-time aggregations across multiple time series with Redis

Windowed Queries for Dashboards

# Real-time dashboard data
async def get_dashboard_data(device_id: str):
    now = int(time.time() * 1000)
    hour_ago = now - 3600000

    # Last hour, 1-minute resolution
    temps = await redis_client.execute_command(
        "TS.RANGE", f"ts:temp:{device_id}",
        hour_ago, now,
        "AGGREGATION", "avg", 60000
    )

    # Multi-device comparison
    all_temps = await redis_client.execute_command(
        "TS.MRANGE", hour_ago, now,
        "AGGREGATION", "avg", 300000,
        "FILTER", "metric=temperature"
    )

    return {"device": temps, "all_devices": all_temps}

When NOT to Use Redis Streams

Redis Streams have limitations compared to Kafka. Therefore, avoid them when you need: multi-datacenter replication with ordering guarantees, message retention beyond RAM capacity, schema evolution and exactly-once processing across systems, or throughput above 500K messages per second. As a result, Redis Streams are ideal for low-latency pipelines under moderate volume, while Kafka handles planetary-scale event streaming.

Redis monitoring and performance dashboard
Monitoring Redis stream lag, consumer health, and time series ingestion rates

Key Takeaways

Redis 8 streams combined with time series provide a complete real-time data pipeline within a single system. Consumer groups enable horizontal scaling. Automatic aggregation rules handle downsampling. For use cases requiring sub-millisecond latency at moderate volume, Redis eliminates the complexity of running separate message brokers and time series databases.

Related Reading

External Resources

In conclusion, Redis 8 Streams Time is an essential topic for modern software development. By applying the patterns and practices covered in this guide, you can build more robust, scalable, and maintainable systems. Start with the fundamentals, iterate on your implementation, and continuously measure results to ensure you are getting the most value from these approaches.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top