Stream Processing — Four Ways to Consume a Data Stream¶
"Batch processing is answering questions about the past; stream processing is answering questions about the present." — Jay Kreps In this notebook we'll build and observe four different streaming consumers, progressing from the simplest (polling a file) to cloud-managed queues (SQS). Each approach solves the same problem — reading a continuous flow of taxi-trip events — but makes different trade-offs in latency, reliability, and complexity. | # | Technique | Key Concept | Real-world Analogy | |---|-----------|------------|-------------------| | 1 | Polling | Pull-based; consumer asks "anything new?" | Refreshing your email inbox | | 2 | Unix Pipes & Sockets | OS-level push; kernel notifies consumer |
tail -fon a log file | | 3 | Redis Pub/Sub & RabbitMQ | Broker-mediated messaging | Post office / bulletin board | | 4 | Amazon SQS | Managed cloud queue | Cloud-native microservices |
Prerequisites¶
Python packages¶
# Install packages in user-space to avoid needing root access
pip install --user redis pika boto3
External services (via Docker)¶
# Note: If 'docker' requires sudo, you can try running these as:
# 1. Rootless Docker (if installed: dockerd-rootless-setuptool.sh install)
# 2. Check if your sysadmin has provided these as shared services on specific IPs/ports.
# If Docker is available without sudo, these commands work as-is because
# the ports (6379, 5672, 4566) are > 1024 and don't require root to bind.
# Redis
docker run -d --name redis-demo -p 6379:6379 redis:7
# RabbitMQ (with management UI at http://localhost:15672, guest/guest)
docker run -d --name rabbitmq-demo -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# LocalStack (AWS SQS emulator)
docker run -d --name localstack -p 4566:4566 localstack/localstack
Helper script¶
All demos use resources/event_generator.py — a shared producer that generates fake taxi-trip events and sends them to various sinks.
Polling¶
What is Polling?¶
The consumer periodically checks a data source for new records. Analogy: Checking your physical mailbox every 5 minutes.
Architecture¶
┌──────────┐ writes ┌──────────────────┐
│ Producer │ ───────────────▶ │ File / Database │
└──────────┘ └────────┬─────────┘
│ reads every N sec
▼
┌──────────────────┐
│ Consumer │
│ (Polling Loop) │
└──────────────────┘
Key Parameters:
- Polling interval — how often do we check?
- Cursor / offset — how do we know what's "new"?
import subprocess, os
# Clean slate
event_file = os.path.expanduser("~/stream_events.jsonl")
# Clean slate
open(event_file, "w").close()
# Start producer in background: writes 30 events, 1 per second
producer = subprocess.Popen(
["python", "resources/event_generator.py", "file",
"--path", event_file,
"--interval", "1", "--count", "30"],
)
print(f"✅ Producer started (PID={producer.pid}), writing events to {event_file}")
✅ Producer started (PID=18464), writing events to /home/ankush/stream_events.jsonl
import json, time
def polling_consumer(filepath, poll_interval=2, max_polls=10):
"""
Reads new lines from a file by tracking the byte offset.
This is the simplest possible streaming consumer.
"""
offset = 0
events_seen = 0
for poll_num in range(1, max_polls + 1):
with open(filepath, "r") as f:
f.seek(offset)
new_lines = f.readlines()
offset = f.tell() # remember where we left off
if new_lines:
for line in new_lines:
event = json.loads(line)
events_seen += 1
print(f" [Poll #{poll_num}] Event {events_seen}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
else:
print(f" [Poll #{poll_num}] No new events.")
time.sleep(poll_interval) # ⏳ wait before next poll
print(f"\n✅ Polling complete. Total events consumed: {events_seen}")
import os
polling_consumer(os.path.expanduser("~/stream_events.jsonl"))
[Poll #1] Event 1: Bronx | $55.34 [Poll #1] Event 2: Manhattan | $50.61 [Poll #1] Event 3: Staten Island | $41.54 [Poll #1] Event 4: Bronx | $84.35 [Poll #2] Event 5: Staten Island | $57.05 [Poll #2] Event 6: Staten Island | $64.77 [Poll #3] Event 7: Queens | $60.0 [Poll #3] Event 8: Bronx | $71.39 [Poll #4] Event 9: Bronx | $11.2 [Poll #4] Event 10: Bronx | $75.76 [Poll #5] Event 11: Brooklyn | $8.88 [Poll #5] Event 12: Staten Island | $11.94 [Poll #6] Event 13: Bronx | $20.11 [Poll #6] Event 14: Queens | $57.12 [Poll #7] Event 15: Brooklyn | $46.93 [Poll #7] Event 16: Brooklyn | $50.58 [Poll #8] Event 17: Brooklyn | $47.92 [Poll #8] Event 18: Brooklyn | $67.23 [Poll #9] Event 19: Bronx | $8.19 [Poll #9] Event 20: Bronx | $41.9 [Poll #10] Event 21: Brooklyn | $20.62 [Poll #10] Event 22: Queens | $41.71 ✅ Polling complete. Total events consumed: 22
# Clean up the producer process
producer.terminate()
producer.wait()
print("🧹 Producer stopped.")
🧹 Producer stopped.
Observations¶
| Aspect | Observation |
|---|---|
| Latency | Up to poll_interval seconds of delay |
| CPU usage | Wasted cycles when there's nothing new |
| Ordering | Guaranteed (sequential file reads) |
| Fault tolerance | Offset can be persisted for recovery |
Limitations¶
- Trade-off: Short interval = more CPU; long interval = higher latency.
- No push notification — consumer is blind between polls.
- Scalability: Multiple consumers need coordination (locks, partitions).
When to Use¶
Polling is appropriate when:
- The data source doesn't support push (e.g., REST APIs, legacy databases)
- Latency requirements are relaxed (seconds to minutes)
- Simplicity is more important than efficiency
Unix Pipes & Sockets¶
Watching with the OS¶
Instead of asking "is there anything new?", we let the operating system tell us when data arrives. This is event-driven / push-based at the OS level. We'll explore two sub-approaches:
| Mechanism | How It Works |
|---|---|
tail -f + pipe |
OS watches file inode for appends |
| TCP Socket | Kernel delivers bytes as they arrive |
Architecture (Socket)¶
┌──────────┐ TCP stream ┌──────────────────┐
│ Producer │ ════════════════▶ │ Socket Consumer │
└──────────┘ push-based └──────────────────┘
(kernel-managed)
Demo A: tail -f via subprocess¶
The tail -f command follows a file and outputs new lines as they are appended.
We pipe its stdout into Python — the OS handles the "waiting" for us.
On Linux, tail -F uses inotify (kernel event system).
What is inotify?¶
A Linux API that lets programs subscribe to filesystem events.
Instead of polling:
Kernel notifies when something changes
How tail -F works¶
- Watch file + parent directory
- If file is modified → read new lines
- If file is deleted/moved:
a. Stop reading old inode
b. Wait for new file with same name - When new file appears:
a. Open new inode
b. Continue tailing
import subprocess, json, time, os
# Ensure fresh file
event_file = os.path.expanduser("~/stream_events.jsonl")
# Clean slate
open(event_file, "w").close()
# Start producer: writes 15 events at 1/sec
print(" ".join(["python", "resources/event_generator.py", "file", "--path", event_file, "--interval", "1", "--count", "15"]))
producer = subprocess.Popen(
["python", "resources/event_generator.py", "file",
"--path", event_file,
"--interval", "1", "--count", "15"],
)
# Consumer: tail -f piped into our Python process
# tail -f ~/stream_events.jsonl
tail_proc = subprocess.Popen(
["tail", "-f", event_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
events_seen = 0
try:
for line in tail_proc.stdout:
line = line.strip()
if not line:
continue
event = json.loads(line)
events_seen += 1
print(f" [tail -f] Event {events_seen}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
if events_seen >= 15:
break
finally:
tail_proc.terminate()
producer.wait()
print(f"\n✅ tail -f complete. Events consumed: {events_seen}")
python resources/event_generator.py file --path /home/ankush/stream_events.jsonl --interval 1 --count 15 [tail -f] Event 1: Queens | $7.98 [tail -f] Event 2: Bronx | $65.09 [tail -f] Event 3: Manhattan | $26.89 [tail -f] Event 4: Bronx | $61.79 [tail -f] Event 5: Bronx | $75.19 [tail -f] Event 6: Manhattan | $28.66 [tail -f] Event 7: Staten Island | $17.11 [tail -f] Event 8: Staten Island | $38.72 [tail -f] Event 9: Manhattan | $58.34 [tail -f] Event 10: Staten Island | $10.85 [tail -f] Event 11: Manhattan | $75.51 [tail -f] Event 12: Staten Island | $48.22 [tail -f] Event 13: Bronx | $72.31 [tail -f] Event 14: Queens | $8.9 [tail -f] Event 15: Manhattan | $44.52 🚀 Starting event generator: sink=file, interval=1.0s, count=15 [file] Wrote event 1/15 [file] Wrote event 2/15 [file] Wrote event 3/15 [file] Wrote event 4/15 [file] Wrote event 5/15 [file] Wrote event 6/15 [file] Wrote event 7/15 [file] Wrote event 8/15 [file] Wrote event 9/15 [file] Wrote event 10/15 [file] Wrote event 11/15 [file] Wrote event 12/15 [file] Wrote event 13/15 [file] Wrote event 14/15 [file] Wrote event 15/15 ✅ Event generator finished. ✅ tail -f complete. Events consumed: 15
Demo B: TCP Socket Consumer¶
A socket server listens for incoming connections. The producer connects and pushes events directly — no file intermediary, no polling interval.
import socket, threading, json, time, subprocess
def start_socket_server(host="localhost", port=9999, max_events=15):
"""
Listens on a TCP socket and prints events as they arrive.
The OS kernel handles buffering and notification.
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(1)
server.settimeout(30) # don't hang forever
print(f"🔌 Socket server listening on {host}:{port}")
conn, addr = server.accept()
print(f" Connected by {addr}")
buffer = ""
events_seen = 0
try:
while events_seen < max_events:
data = conn.recv(4096).decode()
if not data:
break
buffer += data
# Process complete lines (newline-delimited JSON)
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.strip():
event = json.loads(line)
events_seen += 1
print(f" [Socket] Event {events_seen}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
finally:
conn.close()
server.close()
print(f"\n✅ Socket consumer done. Events: {events_seen}")
# Run server in a thread so the notebook doesn't block
server_thread = threading.Thread(target=start_socket_server)
server_thread.start()
time.sleep(1) # give server a moment to bind
# Start producer targeting the socket
producer = subprocess.Popen(
["python", "resources/event_generator.py", "socket",
"--host", "localhost", "--port", "9999",
"--interval", "1", "--count", "15"],
)
server_thread.join(timeout=30)
producer.wait()
🔌 Socket server listening on localhost:9999
Connected by ('127.0.0.1', 39942)
[Socket] Event 1: Bronx | $22.17
[Socket] Event 2: Bronx | $65.79
[Socket] Event 3: Staten Island | $13.46
[Socket] Event 4: Queens | $22.91
[Socket] Event 5: Staten Island | $70.76
[Socket] Event 6: Queens | $68.05
[Socket] Event 7: Queens | $68.5
[Socket] Event 8: Queens | $23.96
[Socket] Event 9: Brooklyn | $24.25
[Socket] Event 10: Queens | $17.9
[Socket] Event 11: Queens | $79.38
[Socket] Event 12: Manhattan | $25.9
[Socket] Event 13: Queens | $11.23
[Socket] Event 14: Brooklyn | $16.69
[Socket] Event 15: Queens | $45.79
✅ Socket consumer done. Events: 15
🚀 Starting event generator: sink=socket, interval=1.0s, count=15
[socket] Sent event 1/15
[socket] Sent event 2/15
[socket] Sent event 3/15
[socket] Sent event 4/15
[socket] Sent event 5/15
[socket] Sent event 6/15
[socket] Sent event 7/15
[socket] Sent event 8/15
[socket] Sent event 9/15
[socket] Sent event 10/15
[socket] Sent event 11/15
[socket] Sent event 12/15
[socket] Sent event 13/15
[socket] Sent event 14/15
[socket] Sent event 15/15
✅ Event generator finished.
0
Observations¶
| Aspect | tail -f / Pipe |
TCP Socket |
|---|---|---|
| Latency | Near-zero (kernel inotify) | Near-zero |
| Push-based? | ✅ Yes | ✅ Yes |
| Ordering | ✅ Single file = ordered | ✅ Single connection = ordered |
| Multi-consumer | ❌ Tricky | Possible (one conn per consumer) |
| Cross-machine | ❌ Local only | ✅ Works over network |
Limitations¶
- No persistence: If the consumer is down, messages are lost (sockets) or pile up (file).
- No replay: Can't re-read old messages easily.
- Tight coupling: Producer and consumer must agree on protocol.
Key Takeaway¶
OS-level mechanisms give us low latency but zero reliability guarantees.
This is where message brokers come in…
Message Brokers: Redis & RabbitMQ¶
Why a Broker?¶
A message broker sits between producers and consumers, providing:
- Decoupling — producer doesn't need to know who consumes
- Buffering — messages are held if consumers are slow
- Fan-out — one message can reach many consumers We'll demo two popular brokers with very different philosophies: | Broker | Model | Durability | Best For | |--------|-------|-----------|----------| | Redis Pub/Sub | Fire-and-forget broadcast | ❌ Messages lost if no subscriber | Real-time notifications, caches | | RabbitMQ | Durable queue with acks | ✅ Messages persist until consumed | Task queues, reliable delivery |
Architecture¶
┌───────────┐
┌──▶ │Consumer A │
┌──────────┐ pub │ └───────────┘
│ Producer │───────▶│ BROKER
└──────────┘ │ ┌───────────┐
└──▶ │Consumer B │
└───────────┘
3a. Redis Pub/Sub¶
Redis Pub/Sub is a broadcast model: every subscriber gets every message.
But if no one is listening, the message is gone forever.
import redis
import json, threading, time, subprocess
def redis_consumer(channel="taxi_events", max_events=15):
"""Subscribe to a Redis channel and print events as they arrive."""
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe(channel)
print(f"📡 Subscribed to Redis channel: '{channel}'")
events_seen = 0
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
events_seen += 1
print(f" [Redis] Event {events_seen}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
if events_seen >= max_events:
break
pubsub.unsubscribe()
print(f"\n✅ Redis consumer done. Events: {events_seen}")
# Start consumer in a thread (must subscribe BEFORE producer publishes)
consumer_thread = threading.Thread(target=redis_consumer)
consumer_thread.start()
time.sleep(1) # ensure subscription is active
# Start producer
producer = subprocess.Popen(
["python", "resources/event_generator.py", "redis",
"--channel", "taxi_events",
"--interval", "1", "--count", "15"],
)
consumer_thread.join(timeout=30)
producer.wait()
📡 Subscribed to Redis channel: 'taxi_events' [Redis] Event 1: Bronx | $58.42 [Redis] Event 2: Staten Island | $47.53 [Redis] Event 3: Bronx | $83.29 [Redis] Event 4: Manhattan | $61.78 [Redis] Event 5: Bronx | $10.57 [Redis] Event 6: Brooklyn | $12.13 [Redis] Event 7: Manhattan | $61.43 [Redis] Event 8: Queens | $24.25 [Redis] Event 9: Brooklyn | $54.01 [Redis] Event 10: Queens | $10.85 [Redis] Event 11: Brooklyn | $38.97 [Redis] Event 12: Staten Island | $8.65 [Redis] Event 13: Bronx | $13.73 [Redis] Event 14: Staten Island | $27.65 [Redis] Event 15: Manhattan | $83.45 ✅ Redis consumer done. Events: 15 🚀 Starting event generator: sink=redis, interval=1.0s, count=15 [redis] Published event 1/15 [redis] Published event 2/15 [redis] Published event 3/15 [redis] Published event 4/15 [redis] Published event 5/15 [redis] Published event 6/15 [redis] Published event 7/15 [redis] Published event 8/15 [redis] Published event 9/15 [redis] Published event 10/15 [redis] Published event 11/15 [redis] Published event 12/15 [redis] Published event 13/15 [redis] Published event 14/15 [redis] Published event 15/15 ✅ Event generator finished.
0
Fire-and-Forget Demo¶
What happens if we publish messages before anyone subscribes?
import redis, json
r = redis.Redis()
# ── Step 1: Publish 5 events with NO subscriber listening ─────────
print("Publishing 5 events with NO subscriber active...")
for i in range(5):
event = {"event_id": i, "note": "nobody is listening"}
r.publish("taxi_events", json.dumps(event))
print(f" Published event {i}")
# ── Step 2: NOW subscribe ────────────────────────────────────────
print("\nNow subscribing...")
pubsub = r.pubsub()
pubsub.subscribe("taxi_events")
# ── Step 3: Try to read — only get the subscription confirmation ─
msg = pubsub.get_message(timeout=2)
print(f"First message type: '{msg['type']}'") # 'subscribe', NOT 'message'
msg = pubsub.get_message(timeout=2)
print(f"Next message: {msg}") # None — the 5 events are gone forever!
pubsub.unsubscribe()
print("\n⚠️ All 5 events were LOST because no subscriber was active!")
print(" This is the 'fire-and-forget' nature of Redis Pub/Sub.")
Publishing 5 events with NO subscriber active... Published event 0 Published event 1 Published event 2 Published event 3 Published event 4 Now subscribing... First message type: 'subscribe' Next message: None ⚠️ All 5 events were LOST because no subscriber was active! This is the 'fire-and-forget' nature of Redis Pub/Sub.
3b. RabbitMQ¶
RabbitMQ is a durable queue: messages persist until a consumer explicitly acknowledges them.
To prove durability, we'll start the producer first, let messages pile up, and then start the consumer 5 seconds later.
import pika
import json, subprocess, time
def rabbitmq_consumer(queue="taxi_events", max_events=15):
"""Consume from a RabbitMQ queue with manual acknowledgment."""
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.queue_declare(queue=queue)
print(f"🐇 Listening on RabbitMQ queue: '{queue}'")
events_seen = 0
def on_message(ch, method, properties, body):
nonlocal events_seen
event = json.loads(body)
events_seen += 1
print(f" [RabbitMQ] Event {events_seen}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 👈 acknowledge
if events_seen >= max_events:
ch.stop_consuming()
channel.basic_consume(queue=queue, on_message_callback=on_message)
try:
channel.start_consuming()
except Exception:
pass
finally:
connection.close()
print(f"\n✅ RabbitMQ consumer done. Events: {events_seen}")
# ── Start producer FIRST — messages will QUEUE (unlike Redis!) ────
producer = subprocess.Popen(
["python", "resources/event_generator.py", "rabbitmq",
"--queue", "taxi_events",
"--interval", "0.5", "--count", "15"],
)
time.sleep(5) # Let messages accumulate in the queue
# ── Start consumer — it picks up the BUFFERED messages! ──────────
print("⏳ Consumer starting 5 seconds AFTER producer...")
print(" Messages should already be waiting in the queue!\n")
rabbitmq_consumer(max_events=15)
producer.wait()
⏳ Consumer starting 5 seconds AFTER producer... Messages should already be waiting in the queue! 🐇 Listening on RabbitMQ queue: 'taxi_events' [RabbitMQ] Event 1: Staten Island | $47.31 [RabbitMQ] Event 2: Manhattan | $71.13 [RabbitMQ] Event 3: Bronx | $57.71 [RabbitMQ] Event 4: Manhattan | $74.16 [RabbitMQ] Event 5: Manhattan | $58.85 [RabbitMQ] Event 6: Queens | $68.51 [RabbitMQ] Event 7: Staten Island | $56.48 [RabbitMQ] Event 8: Manhattan | $35.32 [RabbitMQ] Event 9: Manhattan | $60.63 [RabbitMQ] Event 10: Manhattan | $14.38 [RabbitMQ] Event 11: Manhattan | $52.83 [RabbitMQ] Event 12: Bronx | $42.57 [RabbitMQ] Event 13: Staten Island | $72.14 [RabbitMQ] Event 14: Queens | $63.88 [RabbitMQ] Event 15: Manhattan | $26.43 ✅ RabbitMQ consumer done. Events: 15 🚀 Starting event generator: sink=rabbitmq, interval=0.5s, count=15 [rabbitmq] Published event 1/15 [rabbitmq] Published event 2/15 [rabbitmq] Published event 3/15 [rabbitmq] Published event 4/15 [rabbitmq] Published event 5/15 [rabbitmq] Published event 6/15 [rabbitmq] Published event 7/15 [rabbitmq] Published event 8/15 [rabbitmq] Published event 9/15 [rabbitmq] Published event 10/15 [rabbitmq] Published event 11/15 [rabbitmq] Published event 12/15 [rabbitmq] Published event 13/15 [rabbitmq] Published event 14/15 [rabbitmq] Published event 15/15 ✅ Event generator finished.
0
Redis vs RabbitMQ — Head to Head¶
| Feature | Redis Pub/Sub | RabbitMQ |
|---|---|---|
| Message persistence | ❌ Fire-and-forget | ✅ Durable queues |
| Acknowledgments | ❌ No | ✅ Manual/auto ack |
| Replay | ❌ Not possible | ❌ Not built-in (consumed = gone) |
| Throughput | 🚀 Very high | 🏎️ High |
| Fan-out | ✅ All subscribers get every message | ✅ Via exchanges (fanout, topic, direct) |
| Consumer offline | ⚠️ Misses messages | ✅ Messages wait in queue |
| Use case | Real-time dashboards, caching | Task queues, order processing |
Key Takeaway¶
- Use Redis Pub/Sub when speed matters and occasional message loss is acceptable.
- Use RabbitMQ when every message must be processed (at-least-once delivery).
- For replay / rewind capability, you need a log-based broker like Apache Kafka (out of scope today).
Amazon SQS (Simple Queue Service)¶
Managed Cloud Queues¶
SQS removes the burden of operating your own broker infrastructure. Key properties:
- Fully managed — no servers to provision
- At-least-once delivery — messages are delivered at least once
- Visibility timeout — message is "invisible" while being processed
- Dead-letter queues — failed messages are routed for investigation
- Auto-scaling — from 1 to millions of messages/sec
Architecture¶
┌──────────┐ SendMessage ┌───────────┐ ReceiveMessage ┌──────────────┐
│ Producer │ ─────────────────▶│ AWS SQS │◀────────────────── │ Consumer │
└──────────┘ │ (Queue) │ DeleteMessage └──────────────┘
└───────────┘
🧪 We use LocalStack to emulate SQS locally.
No AWS account or charges required!
Step 1: Create the SQS Queue¶
import boto3, json
# Connect to LocalStack's SQS emulator
sqs = boto3.client(
"sqs",
endpoint_url="http://localhost:4566",
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
# Create the queue
response = sqs.create_queue(
QueueName="taxi-events",
Attributes={
"VisibilityTimeout": "30", # seconds a message is hidden after receive
"MessageRetentionPeriod": "86400", # keep messages for 1 day
},
)
queue_url = response["QueueUrl"]
print(f"📬 Queue created: {queue_url}")
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connection.py:204, in HTTPConnection._new_conn(self) 203 try: --> 204 sock = connection.create_connection( 205 (self._dns_host, self.port), 206 self.timeout, 207 source_address=self.source_address, 208 socket_options=self.socket_options, 209 ) 210 except socket.gaierror as e: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/util/connection.py:85, in create_connection(address, timeout, source_address, socket_options) 84 try: ---> 85 raise err 86 finally: 87 # Break explicitly a reference cycle File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/util/connection.py:73, in create_connection(address, timeout, source_address, socket_options) 72 sock.bind(source_address) ---> 73 sock.connect(sa) 74 # Break explicitly a reference cycle ConnectionRefusedError: [Errno 111] Connection refused The above exception was the direct cause of the following exception: NewConnectionError Traceback (most recent call last) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/httpsession.py:477, in URLLib3Session.send(self, request) 476 request_target = self._get_request_target(request.url, proxy_url) --> 477 urllib_response = conn.urlopen( 478 method=request.method, 479 url=request_target, 480 body=request.body, 481 headers=request.headers, 482 retries=Retry(False), 483 assert_same_host=False, 484 preload_content=False, 485 decode_content=False, 486 chunked=self._chunked(request.headers), 487 ) 489 http_response = botocore.awsrequest.AWSResponse( 490 request.url, 491 urllib_response.status, 492 urllib_response.headers, 493 urllib_response, 494 ) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connectionpool.py:841, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw) 839 new_e = ProtocolError("Connection aborted.", new_e) --> 841 retries = retries.increment( 842 method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2] 843 ) 844 retries.sleep() File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/util/retry.py:465, in Retry.increment(self, method, url, response, error, _pool, _stacktrace) 463 if self.total is False and error: 464 # Disabled, indicate to re-raise the error. --> 465 raise reraise(type(error), error, _stacktrace) 467 total = self.total File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/util/util.py:39, in reraise(tp, value, tb) 38 raise value.with_traceback(tb) ---> 39 raise value 40 finally: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connectionpool.py:787, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw) 786 # Make the request on the HTTPConnection object --> 787 response = self._make_request( 788 conn, 789 method, 790 url, 791 timeout=timeout_obj, 792 body=body, 793 headers=headers, 794 chunked=chunked, 795 retries=retries, 796 response_conn=response_conn, 797 preload_content=preload_content, 798 decode_content=decode_content, 799 **response_kw, 800 ) 802 # Everything went great! File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connectionpool.py:493, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length) 492 try: --> 493 conn.request( 494 method, 495 url, 496 body=body, 497 headers=headers, 498 chunked=chunked, 499 preload_content=preload_content, 500 decode_content=decode_content, 501 enforce_content_length=enforce_content_length, 502 ) 504 # We are swallowing BrokenPipeError (errno.EPIPE) since the server is 505 # legitimately able to close the connection after sending a valid response. 506 # With this behaviour, the received response is still readable. File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/awsrequest.py:96, in AWSConnection.request(self, method, url, body, headers, *args, **kwargs) 95 self.response_class = self._original_response_cls ---> 96 rval = super().request(method, url, body, headers, *args, **kwargs) 97 self._expect_header_set = False File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connection.py:500, in HTTPConnection.request(self, method, url, body, headers, chunked, preload_content, decode_content, enforce_content_length) 499 self.putheader(header, value) --> 500 self.endheaders() 502 # If we're given a body we start sending that in chunks. File /usr/lib/python3.12/http/client.py:1351, in HTTPConnection.endheaders(self, message_body, encode_chunked) 1350 raise CannotSendHeader() -> 1351 self._send_output(message_body, encode_chunked=encode_chunked) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/awsrequest.py:123, in AWSConnection._send_output(self, message_body, *args, **kwargs) 122 message_body = None --> 123 self.send(msg) 124 if self._expect_header_set: 125 # This is our custom behavior. If the Expect header was 126 # set, it will trigger this custom behavior. File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/awsrequest.py:223, in AWSConnection.send(self, str) 222 return --> 223 return super().send(str) File /usr/lib/python3.12/http/client.py:1055, in HTTPConnection.send(self, data) 1054 if self.auto_open: -> 1055 self.connect() 1056 else: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connection.py:331, in HTTPConnection.connect(self) 330 def connect(self) -> None: --> 331 self.sock = self._new_conn() 332 if self._tunnel_host: 333 # If we're tunneling it means we're connected to our proxy. File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/urllib3/connection.py:219, in HTTPConnection._new_conn(self) 218 except OSError as e: --> 219 raise NewConnectionError( 220 self, f"Failed to establish a new connection: {e}" 221 ) from e 223 sys.audit("http.client.connect", self, self.host, self.port) NewConnectionError: AWSHTTPConnection(host='localhost', port=4566): Failed to establish a new connection: [Errno 111] Connection refused During handling of the above exception, another exception occurred: EndpointConnectionError Traceback (most recent call last) Cell In[12], line 11 3 sqs = boto3.client( 4 "sqs", 5 endpoint_url="http://localhost:4566", (...) 8 aws_secret_access_key="test", 9 ) 10 # Create the queue ---> 11 response = sqs.create_queue( 12 QueueName="taxi-events", 13 Attributes={ 14 "VisibilityTimeout": "30", # seconds a message is hidden after receive 15 "MessageRetentionPeriod": "86400", # keep messages for 1 day 16 }, 17 ) 18 queue_url = response["QueueUrl"] 19 print(f"📬 Queue created: {queue_url}") File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/client.py:602, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs) 598 raise TypeError( 599 f"{py_operation_name}() only accepts keyword arguments." 600 ) 601 # The "self" in this scope is referring to the BaseClient. --> 602 return self._make_api_call(operation_name, kwargs) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/context.py:123, in with_current_context.<locals>.decorator.<locals>.wrapper(*args, **kwargs) 121 if hook: 122 hook() --> 123 return func(*args, **kwargs) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/client.py:1060, in BaseClient._make_api_call(self, operation_name, api_params) 1056 maybe_compress_request( 1057 self.meta.config, request_dict, operation_model 1058 ) 1059 apply_request_checksum(request_dict) -> 1060 http, parsed_response = self._make_request( 1061 operation_model, request_dict, request_context 1062 ) 1064 self.meta.events.emit( 1065 f'after-call.{service_id}.{operation_name}', 1066 http_response=http, (...) 1069 context=request_context, 1070 ) 1072 if http.status_code >= 300: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/client.py:1084, in BaseClient._make_request(self, operation_model, request_dict, request_context) 1082 def _make_request(self, operation_model, request_dict, request_context): 1083 try: -> 1084 return self._endpoint.make_request(operation_model, request_dict) 1085 except Exception as e: 1086 self.meta.events.emit( 1087 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}', 1088 exception=e, 1089 context=request_context, 1090 ) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/endpoint.py:119, in Endpoint.make_request(self, operation_model, request_dict) 113 def make_request(self, operation_model, request_dict): 114 logger.debug( 115 "Making request for %s with params: %s", 116 operation_model, 117 request_dict, 118 ) --> 119 return self._send_request(request_dict, operation_model) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/endpoint.py:200, in Endpoint._send_request(self, request_dict, operation_model) 196 request = self.create_request(request_dict, operation_model) 197 success_response, exception = self._get_response( 198 request, operation_model, context 199 ) --> 200 while self._needs_retry( 201 attempts, 202 operation_model, 203 request_dict, 204 success_response, 205 exception, 206 ): 207 attempts += 1 208 self._update_retries_context(context, attempts, success_response) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/endpoint.py:360, in Endpoint._needs_retry(self, attempts, operation_model, request_dict, response, caught_exception) 358 service_id = operation_model.service_model.service_id.hyphenize() 359 event_name = f"needs-retry.{service_id}.{operation_model.name}" --> 360 responses = self._event_emitter.emit( 361 event_name, 362 response=response, 363 endpoint=self, 364 operation=operation_model, 365 attempts=attempts, 366 caught_exception=caught_exception, 367 request_dict=request_dict, 368 ) 369 handler_response = first_non_none_response(responses) 370 if handler_response is None: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/hooks.py:412, in EventAliaser.emit(self, event_name, **kwargs) 410 def emit(self, event_name, **kwargs): 411 aliased_event_name = self._alias_event_name(event_name) --> 412 return self._emitter.emit(aliased_event_name, **kwargs) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/hooks.py:256, in HierarchicalEmitter.emit(self, event_name, **kwargs) 245 def emit(self, event_name, **kwargs): 246 """ 247 Emit an event by name with arguments passed as keyword args. 248 (...) 254 handlers. 255 """ --> 256 return self._emit(event_name, kwargs) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/hooks.py:239, in HierarchicalEmitter._emit(self, event_name, kwargs, stop_on_response) 237 for handler in handlers_to_call: 238 logger.debug('Event %s: calling handler %s', event_name, handler) --> 239 response = handler(**kwargs) 240 responses.append((handler, response)) 241 if stop_on_response and response is not None: File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:207, in RetryHandler.__call__(self, attempts, response, caught_exception, **kwargs) 204 retries_context = kwargs['request_dict']['context'].get('retries') 205 checker_kwargs.update({'retries_context': retries_context}) --> 207 if self._checker(**checker_kwargs): 208 result = self._action(attempts=attempts) 209 logger.debug("Retry needed, action of: %s", result) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:284, in MaxAttemptsDecorator.__call__(self, attempt_number, response, caught_exception, retries_context) 279 if retries_context: 280 retries_context['max'] = max( 281 retries_context.get('max', 0), self._max_attempts 282 ) --> 284 should_retry = self._should_retry( 285 attempt_number, response, caught_exception 286 ) 287 if should_retry: 288 if attempt_number >= self._max_attempts: 289 # explicitly set MaxAttemptsReached File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:320, in MaxAttemptsDecorator._should_retry(self, attempt_number, response, caught_exception) 316 return True 317 else: 318 # If we've exceeded the max attempts we just let the exception 319 # propagate if one has occurred. --> 320 return self._checker(attempt_number, response, caught_exception) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:363, in MultiChecker.__call__(self, attempt_number, response, caught_exception) 361 def __call__(self, attempt_number, response, caught_exception): 362 for checker in self._checkers: --> 363 checker_response = checker( 364 attempt_number, response, caught_exception 365 ) 366 if checker_response: 367 return checker_response File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:247, in BaseChecker.__call__(self, attempt_number, response, caught_exception) 245 return self._check_response(attempt_number, response) 246 elif caught_exception is not None: --> 247 return self._check_caught_exception( 248 attempt_number, caught_exception 249 ) 250 else: 251 raise ValueError("Both response and caught_exception are None.") File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/retryhandler.py:416, in ExceptionRaiser._check_caught_exception(self, attempt_number, caught_exception) 408 def _check_caught_exception(self, attempt_number, caught_exception): 409 # This is implementation specific, but this class is useful by 410 # coordinating with the MaxAttemptsDecorator. (...) 414 # the MaxAttemptsDecorator is not interested in retrying the exception 415 # then this exception just propagates out past the retry code. --> 416 raise caught_exception File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/endpoint.py:279, in Endpoint._do_get_response(self, request, operation_model, context) 277 http_response = first_non_none_response(responses) 278 if http_response is None: --> 279 http_response = self._send(request) 280 except HTTPClientError as e: 281 return (None, e) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/endpoint.py:383, in Endpoint._send(self, request) 382 def _send(self, request): --> 383 return self.http_session.send(request) File ~/workplace/da_colab/DS614/.venv/lib/python3.12/site-packages/botocore/httpsession.py:506, in URLLib3Session.send(self, request) 504 raise SSLError(endpoint_url=request.url, error=e) 505 except (NewConnectionError, socket.gaierror) as e: --> 506 raise EndpointConnectionError(endpoint_url=request.url, error=e) 507 except ProxyError as e: 508 raise ProxyConnectionError( 509 proxy_url=mask_proxy_url(proxy_url), error=e 510 ) EndpointConnectionError: Could not connect to the endpoint URL: "http://localhost:4566/"
Step 2: Producer — Send Messages¶
import time, random
from datetime import datetime
ZONES = ["Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island"]
for i in range(15):
event = {
"event_id": random.randint(100000, 999999),
"timestamp": datetime.now().isoformat(),
"pickup_zone": random.choice(ZONES),
"passengers": random.randint(1, 6),
"fare_estimate": round(random.uniform(5.0, 85.0), 2),
}
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(event),
MessageAttributes={
"EventType": {
"DataType": "String",
"StringValue": "trip_start",
}
},
)
print(f" 📤 Sent event {i+1}: {event['pickup_zone']} | ${event['fare_estimate']}")
time.sleep(0.5)
print(f"\n✅ All 15 messages sent to SQS.")
Step 3: Consumer — Long Polling¶
SQS supports long polling (WaitTimeSeconds > 0), which blocks the request until messages arrive (or times out). This reduces empty responses and API costs compared to short polling.
events_processed = 0
max_events = 15
max_retries = 5
empty_count = 0
print(f"📥 Starting SQS consumer (long polling)...\n")
while events_processed < max_events and empty_count < max_retries:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=5, # batch up to 5
WaitTimeSeconds=5, # 👈 long polling (blocks up to 5s)
MessageAttributeNames=["All"],
)
messages = response.get("Messages", [])
if not messages:
empty_count += 1
print(f" ⏳ No messages available (attempt {empty_count}/{max_retries}), retrying...")
continue
empty_count = 0 # reset on successful receive
for msg in messages:
event = json.loads(msg["Body"])
events_processed += 1
print(f" [SQS] Event {events_processed}: "
f"{event['pickup_zone']} | ${event['fare_estimate']}")
# ✅ Delete after successful processing (acknowledge)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg["ReceiptHandle"],
)
print(f"\n✅ SQS consumer done. Events processed: {events_processed}")
Visibility Timeout Demo¶
When a consumer receives a message, SQS makes it invisible for VisibilityTimeout seconds.
If the consumer doesn't delete it in time (e.g., it crashes), the message re-appears for another consumer.
# ── Demonstrate the "visibility timeout" concept ──────────────────
# 1. Send one test message
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"test": "visibility_timeout_demo"}),
)
print("1️⃣ Sent a test message.")
# 2. Receive it (starts the visibility timeout)
resp = sqs.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=5
)
msg = resp["Messages"][0]
print(f"2️⃣ Received: {msg['Body']}")
print(f" Receipt handle: {msg['ReceiptHandle'][:40]}...")
# 3. DON'T delete it — simulate a consumer crash
print("3️⃣ Simulating consumer crash (NOT deleting the message)...")
print(f" Message is now INVISIBLE for 30 seconds (VisibilityTimeout).")
# 4. Try to receive again immediately — nothing available!
resp2 = sqs.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=2
)
available = len(resp2.get("Messages", []))
print(f"4️⃣ Immediate re-read: {available} messages available (invisible!)")
print("\n💡 After the VisibilityTimeout expires, the message becomes visible again")
print(" and can be picked up by another consumer.")
print(" This is how SQS ensures AT-LEAST-ONCE delivery!")
# Clean up
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
print("\n🧹 Test message cleaned up.")
SQS Key Concepts¶
| Concept | Description |
|---|---|
| Standard Queue | At-least-once delivery, best-effort ordering |
| FIFO Queue | Exactly-once processing, strict ordering (lower throughput) |
| Long Polling | WaitTimeSeconds > 0 — reduces empty responses and costs |
| Visibility Timeout | Message is hidden while being processed; reappears if not deleted |
| Dead Letter Queue | Automatically routes messages that fail N times |
Limitations¶
- At-least-once delivery means consumers must be idempotent
- No real-time push — still fundamentally polling (long-polling mitigates this)
- Max message size is 256 KB (use S3 for larger payloads)
- Vendor lock-in — tightly coupled to AWS ecosystem
Grand Comparison¶
| Feature | Polling | Unix Pipes/Sockets | Redis Pub/Sub | RabbitMQ | Amazon SQS |
|---|---|---|---|---|---|
| Model | Pull | Push | Push (broadcast) | Push (queue) | Pull (long-poll) |
| Latency | ⏱️ High (interval) | ⚡ Very low | ⚡ Very low | ⚡ Low | ⏱️ Low–Med |
| Persistence | ✅ (file/DB) | ❌ | ❌ | ✅ | ✅ |
| Reliability | 🟡 DIY | ❌ Fragile | ❌ Fire-and-forget | ✅ Acks | ✅ At-least-once |
| Multi-consumer | 🟡 Manual | 🟡 Manual | ✅ Built-in | ✅ Built-in | ✅ Built-in |
| Cross-machine | ✅ (shared storage) | ✅ (sockets) | ✅ | ✅ | ✅ |
| Complexity | 🟢 Trivial | 🟡 Medium | 🟡 Medium | 🟠 Higher | 🟢 Low (managed) |
| Best for | Simple / legacy | Low-latency local | Real-time broadcast | Reliable task queues | Cloud-native apps |
Key Takeaways¶
- Polling is the simplest but least efficient — good for prototyping or where push isn't available.
- OS-level streaming (pipes/sockets) gives ultra-low latency but no durability or fault tolerance.
- Message brokers (Redis, RabbitMQ) add decoupling and fan-out, but you must operate the infrastructure.
- Managed services (SQS) offload operations to the cloud provider — ideal for production workloads.
- The right choice depends on your latency, reliability, and operational requirements.
As data systems evolve, you'll often combine multiple approaches:
Polling for legacy sources → brokers for internal services → SQS/Kafka for production.
Cleanup¶
Run the cell below to tear down all demo infrastructure.
import subprocess, os
# Remove temp files
event_file = os.path.expanduser("~/stream_events.jsonl")
if os.path.exists(event_file):
os.remove(event_file)
print(f"🗑️ Removed {event_file}")
# Stop Docker containers (if running)
for name in ["redis-demo", "rabbitmq-demo", "localstack"]:
result = subprocess.run(["docker", "stop", name], capture_output=True, text=True)
subprocess.run(["docker", "rm", name], capture_output=True, text=True)
if result.returncode == 0:
print(f"🗑️ Stopped & removed container: {name}")
else:
print(f"⏭️ Container '{name}' was not running.")
print("\n✅ Cleanup complete!")