Streaming Code Execution: Real-time Output for AI Agents
When code runs for more than a second, users start wondering: "Is it working?" Streaming output solves this by showing results as they happen, not just when execution completes.
This guide shows you how to stream code execution from HopX sandboxes, enabling AI agents that feel responsive and keep users engaged.
Why Streaming Matters
Without streaming:
| 1 | User: "Analyze this 100MB dataset" |
| 2 | [Spinner for 30 seconds] |
| 3 | [All output appears at once] |
| 4 | |
With streaming:
| 1 | User: "Analyze this 100MB dataset" |
| 2 | → Loading dataset... |
| 3 | → Processing 1,000,000 rows... |
| 4 | → Calculating statistics... |
| 5 | → Generating visualizations... |
| 6 | → Analysis complete! |
| 7 | |
Streaming provides:
- Feedback - Users know the system is working
- Progress - Long tasks feel faster
- Debugging - See issues as they occur
- Cancellation - Stop early if output looks wrong
Basic Streaming
Synchronous Streaming
The simplest form of streaming reads output as it's produced:
| 1 | from hopx import Sandbox |
| 2 | |
| 3 | sandbox = Sandbox.create(template="code-interpreter") |
| 4 | |
| 5 | # Long-running script with progress output |
| 6 | code = ''' |
| 7 | import time |
| 8 | |
| 9 | for i in range(10): |
| 10 | print(f"Processing step {i+1}/10...") |
| 11 | time.sleep(1) |
| 12 | |
| 13 | print("Complete!") |
| 14 | ''' |
| 15 | |
| 16 | sandbox.files.write("/app/process.py", code) |
| 17 | |
| 18 | # Stream output line by line |
| 19 | for line in sandbox.commands.stream("cd /app && python -u process.py"): |
| 20 | print(f"[OUTPUT] {line}") |
| 21 | |
Note the -u flag for unbuffered Python output—essential for real-time streaming.
Async Streaming
For web applications and async frameworks:
| 1 | import asyncio |
| 2 | from hopx import Sandbox |
| 3 | |
| 4 | async def stream_execution(): |
| 5 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 6 | |
| 7 | code = ''' |
| 8 | import time |
| 9 | import sys |
| 10 | |
| 11 | for i in range(5): |
| 12 | print(f"Step {i+1}", flush=True) |
| 13 | time.sleep(0.5) |
| 14 | ''' |
| 15 | |
| 16 | await sandbox.files.write_async("/app/task.py", code) |
| 17 | |
| 18 | async for line in sandbox.commands.stream_async("python -u /app/task.py"): |
| 19 | print(f"Received: {line}") |
| 20 | |
| 21 | await sandbox.kill_async() |
| 22 | |
| 23 | asyncio.run(stream_execution()) |
| 24 | |
Streaming to Web Clients
Server-Sent Events (SSE)
SSE is perfect for streaming to browsers:
| 1 | from fastapi import FastAPI, Response |
| 2 | from fastapi.responses import StreamingResponse |
| 3 | from hopx import Sandbox |
| 4 | import asyncio |
| 5 | |
| 6 | app = FastAPI() |
| 7 | |
| 8 | @app.post("/execute") |
| 9 | async def execute_code(code: str): |
| 10 | async def generate(): |
| 11 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 12 | |
| 13 | try: |
| 14 | await sandbox.files.write_async("/app/code.py", code) |
| 15 | |
| 16 | async for line in sandbox.commands.stream_async("python -u /app/code.py"): |
| 17 | # Format as SSE |
| 18 | yield f"data: {line}\n\n" |
| 19 | |
| 20 | yield "data: [DONE]\n\n" |
| 21 | |
| 22 | finally: |
| 23 | await sandbox.kill_async() |
| 24 | |
| 25 | return StreamingResponse( |
| 26 | generate(), |
| 27 | media_type="text/event-stream", |
| 28 | headers={ |
| 29 | "Cache-Control": "no-cache", |
| 30 | "Connection": "keep-alive" |
| 31 | } |
| 32 | ) |
| 33 | |
Frontend consumption:
| 1 | const eventSource = new EventSource('/execute', { |
| 2 | method: 'POST', |
| 3 | body: JSON.stringify({ code: 'print("Hello")' }) |
| 4 | }); |
| 5 | |
| 6 | eventSource.onmessage = (event) => { |
| 7 | if (event.data === '[DONE]') { |
| 8 | eventSource.close(); |
| 9 | return; |
| 10 | } |
| 11 | |
| 12 | // Append to output |
| 13 | document.getElementById('output').textContent += event.data + '\n'; |
| 14 | }; |
| 15 | |
| 16 | eventSource.onerror = () => { |
| 17 | console.error('Stream error'); |
| 18 | eventSource.close(); |
| 19 | }; |
| 20 | |
WebSocket Streaming
For bidirectional communication:
| 1 | from fastapi import FastAPI, WebSocket |
| 2 | from hopx import Sandbox |
| 3 | import asyncio |
| 4 | import json |
| 5 | |
| 6 | app = FastAPI() |
| 7 | |
| 8 | @app.websocket("/ws/execute") |
| 9 | async def websocket_execute(websocket: WebSocket): |
| 10 | await websocket.accept() |
| 11 | |
| 12 | sandbox = None |
| 13 | |
| 14 | try: |
| 15 | while True: |
| 16 | # Receive code from client |
| 17 | data = await websocket.receive_json() |
| 18 | |
| 19 | if data["type"] == "execute": |
| 20 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 21 | await sandbox.files.write_async("/app/code.py", data["code"]) |
| 22 | |
| 23 | # Stream output back |
| 24 | async for line in sandbox.commands.stream_async("python -u /app/code.py"): |
| 25 | await websocket.send_json({ |
| 26 | "type": "output", |
| 27 | "content": line |
| 28 | }) |
| 29 | |
| 30 | await websocket.send_json({"type": "complete"}) |
| 31 | await sandbox.kill_async() |
| 32 | sandbox = None |
| 33 | |
| 34 | elif data["type"] == "cancel": |
| 35 | if sandbox: |
| 36 | await sandbox.kill_async() |
| 37 | sandbox = None |
| 38 | await websocket.send_json({"type": "cancelled"}) |
| 39 | |
| 40 | except Exception as e: |
| 41 | await websocket.send_json({"type": "error", "message": str(e)}) |
| 42 | |
| 43 | finally: |
| 44 | if sandbox: |
| 45 | await sandbox.kill_async() |
| 46 | |
Frontend WebSocket client:
| 1 | const ws = new WebSocket('ws://localhost:8000/ws/execute'); |
| 2 | let outputDiv = document.getElementById('output'); |
| 3 | |
| 4 | ws.onopen = () => { |
| 5 | // Send code to execute |
| 6 | ws.send(JSON.stringify({ |
| 7 | type: 'execute', |
| 8 | code: ` |
| 9 | import time |
| 10 | for i in range(10): |
| 11 | print(f'Processing {i}...') |
| 12 | time.sleep(0.5) |
| 13 | ` |
| 14 | })); |
| 15 | }; |
| 16 | |
| 17 | ws.onmessage = (event) => { |
| 18 | const data = JSON.parse(event.data); |
| 19 | |
| 20 | switch (data.type) { |
| 21 | case 'output': |
| 22 | outputDiv.textContent += data.content + '\n'; |
| 23 | break; |
| 24 | case 'complete': |
| 25 | outputDiv.textContent += '\n[Execution complete]'; |
| 26 | break; |
| 27 | case 'error': |
| 28 | outputDiv.textContent += `\nError: ${data.message}`; |
| 29 | break; |
| 30 | } |
| 31 | }; |
| 32 | |
| 33 | // Cancel button |
| 34 | document.getElementById('cancel').onclick = () => { |
| 35 | ws.send(JSON.stringify({ type: 'cancel' })); |
| 36 | }; |
| 37 | |
Progress Reporting
Structured Progress Updates
Instead of raw output, send structured progress:
| 1 | from hopx import Sandbox |
| 2 | import json |
| 3 | |
| 4 | def execute_with_progress(code: str, on_progress): |
| 5 | """Execute code and report structured progress""" |
| 6 | sandbox = Sandbox.create(template="code-interpreter") |
| 7 | |
| 8 | # Wrapper that outputs JSON progress |
| 9 | wrapper = ''' |
| 10 | import sys |
| 11 | import json |
| 12 | |
| 13 | def progress(message, percent=None, data=None): |
| 14 | """Report progress in structured format""" |
| 15 | update = {"type": "progress", "message": message} |
| 16 | if percent is not None: |
| 17 | update["percent"] = percent |
| 18 | if data is not None: |
| 19 | update["data"] = data |
| 20 | print("__PROGRESS__" + json.dumps(update), flush=True) |
| 21 | |
| 22 | def result(data): |
| 23 | """Report final result""" |
| 24 | print("__RESULT__" + json.dumps({"type": "result", "data": data}), flush=True) |
| 25 | |
| 26 | def error(message): |
| 27 | """Report error""" |
| 28 | print("__ERROR__" + json.dumps({"type": "error", "message": message}), flush=True) |
| 29 | |
| 30 | # User code can now use progress(), result(), error() |
| 31 | ''' |
| 32 | |
| 33 | full_code = wrapper + "\n" + code |
| 34 | sandbox.files.write("/app/task.py", full_code) |
| 35 | |
| 36 | for line in sandbox.commands.stream("python -u /app/task.py"): |
| 37 | if line.startswith("__PROGRESS__"): |
| 38 | data = json.loads(line[12:]) |
| 39 | on_progress(data) |
| 40 | elif line.startswith("__RESULT__"): |
| 41 | data = json.loads(line[10:]) |
| 42 | on_progress(data) |
| 43 | elif line.startswith("__ERROR__"): |
| 44 | data = json.loads(line[9:]) |
| 45 | on_progress(data) |
| 46 | else: |
| 47 | # Regular output |
| 48 | on_progress({"type": "output", "content": line}) |
| 49 | |
| 50 | sandbox.kill() |
| 51 | |
| 52 | |
| 53 | # Usage |
| 54 | def handle_progress(update): |
| 55 | if update["type"] == "progress": |
| 56 | print(f"[{update.get('percent', '?')}%] {update['message']}") |
| 57 | elif update["type"] == "result": |
| 58 | print(f"Result: {update['data']}") |
| 59 | |
| 60 | code = ''' |
| 61 | import time |
| 62 | |
| 63 | progress("Starting analysis", 0) |
| 64 | |
| 65 | for i in range(5): |
| 66 | progress(f"Processing batch {i+1}", (i+1) * 20) |
| 67 | time.sleep(0.5) |
| 68 | |
| 69 | result({"processed": 5, "status": "success"}) |
| 70 | ''' |
| 71 | |
| 72 | execute_with_progress(code, handle_progress) |
| 73 | |
Progress Bar Integration
Send progress suitable for UI progress bars:
| 1 | analysis_code = ''' |
| 2 | import pandas as pd |
| 3 | import time |
| 4 | |
| 5 | # Load data |
| 6 | progress("Loading dataset...", 0) |
| 7 | df = pd.read_csv('/app/data.csv') |
| 8 | progress(f"Loaded {len(df)} rows", 20) |
| 9 | |
| 10 | # Clean data |
| 11 | progress("Cleaning data...", 20) |
| 12 | df = df.dropna() |
| 13 | progress(f"Cleaned, {len(df)} rows remaining", 40) |
| 14 | |
| 15 | # Process |
| 16 | total = len(df) |
| 17 | for i, chunk in enumerate(range(0, total, 1000)): |
| 18 | percent = 40 + (i / (total // 1000)) * 40 |
| 19 | progress(f"Processing rows {chunk}-{chunk+1000}", percent) |
| 20 | time.sleep(0.1) # Simulate work |
| 21 | |
| 22 | # Generate report |
| 23 | progress("Generating report...", 80) |
| 24 | summary = df.describe() |
| 25 | progress("Report ready", 100) |
| 26 | |
| 27 | result(summary.to_dict()) |
| 28 | ''' |
| 29 | |
Handling Long-Running Tasks
Heartbeat Pattern
Keep connections alive during long operations:
| 1 | import asyncio |
| 2 | from hopx import Sandbox |
| 3 | |
| 4 | async def execute_with_heartbeat(code: str): |
| 5 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 6 | await sandbox.files.write_async("/app/task.py", code) |
| 7 | |
| 8 | # Start execution |
| 9 | execution = asyncio.create_task( |
| 10 | collect_output(sandbox.commands.stream_async("python -u /app/task.py")) |
| 11 | ) |
| 12 | |
| 13 | # Send heartbeats while executing |
| 14 | while not execution.done(): |
| 15 | yield {"type": "heartbeat", "timestamp": time.time()} |
| 16 | await asyncio.sleep(5) |
| 17 | |
| 18 | # Get final result |
| 19 | output = await execution |
| 20 | yield {"type": "complete", "output": output} |
| 21 | |
| 22 | await sandbox.kill_async() |
| 23 | |
| 24 | |
| 25 | async def collect_output(stream): |
| 26 | output = [] |
| 27 | async for line in stream: |
| 28 | output.append(line) |
| 29 | return output |
| 30 | |
Timeout with Partial Results
Return what you have if execution takes too long:
| 1 | import asyncio |
| 2 | from hopx import Sandbox |
| 3 | |
| 4 | async def execute_with_timeout(code: str, timeout: int = 60): |
| 5 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 6 | await sandbox.files.write_async("/app/task.py", code) |
| 7 | |
| 8 | output = [] |
| 9 | |
| 10 | try: |
| 11 | async with asyncio.timeout(timeout): |
| 12 | async for line in sandbox.commands.stream_async("python -u /app/task.py"): |
| 13 | output.append(line) |
| 14 | yield {"type": "output", "line": line} |
| 15 | |
| 16 | yield {"type": "complete", "output": output} |
| 17 | |
| 18 | except asyncio.TimeoutError: |
| 19 | yield { |
| 20 | "type": "timeout", |
| 21 | "partial_output": output, |
| 22 | "message": f"Execution exceeded {timeout}s limit" |
| 23 | } |
| 24 | |
| 25 | finally: |
| 26 | await sandbox.kill_async() |
| 27 | |
Streaming for AI Agents
Integration with LLM Streaming
Combine code streaming with LLM response streaming:
| 1 | import openai |
| 2 | from hopx import Sandbox |
| 3 | |
| 4 | async def agent_stream(user_message: str): |
| 5 | client = openai.AsyncOpenAI() |
| 6 | |
| 7 | # Stream LLM response |
| 8 | response = await client.chat.completions.create( |
| 9 | model="gpt-4o", |
| 10 | messages=[ |
| 11 | {"role": "system", "content": "You are a helpful coding assistant."}, |
| 12 | {"role": "user", "content": user_message} |
| 13 | ], |
| 14 | stream=True |
| 15 | ) |
| 16 | |
| 17 | code_block = "" |
| 18 | in_code_block = False |
| 19 | |
| 20 | async for chunk in response: |
| 21 | content = chunk.choices[0].delta.content or "" |
| 22 | |
| 23 | # Detect code blocks |
| 24 | if "```python" in content: |
| 25 | in_code_block = True |
| 26 | yield {"type": "text", "content": content} |
| 27 | continue |
| 28 | |
| 29 | if "```" in content and in_code_block: |
| 30 | in_code_block = False |
| 31 | |
| 32 | # Execute the collected code |
| 33 | yield {"type": "text", "content": content} |
| 34 | yield {"type": "executing", "code": code_block} |
| 35 | |
| 36 | async for output in execute_code_stream(code_block): |
| 37 | yield {"type": "execution", "output": output} |
| 38 | |
| 39 | code_block = "" |
| 40 | continue |
| 41 | |
| 42 | if in_code_block: |
| 43 | code_block += content |
| 44 | |
| 45 | yield {"type": "text", "content": content} |
| 46 | |
| 47 | |
| 48 | async def execute_code_stream(code: str): |
| 49 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 50 | |
| 51 | try: |
| 52 | await sandbox.files.write_async("/app/code.py", code) |
| 53 | |
| 54 | async for line in sandbox.commands.stream_async("python -u /app/code.py"): |
| 55 | yield line |
| 56 | |
| 57 | finally: |
| 58 | await sandbox.kill_async() |
| 59 | |
Multi-Step Agent with Streaming
| 1 | class StreamingAgent: |
| 2 | def __init__(self): |
| 3 | self.client = openai.AsyncOpenAI() |
| 4 | |
| 5 | async def run(self, task: str): |
| 6 | """Run agent with streaming at every step""" |
| 7 | |
| 8 | yield {"type": "thinking", "message": "Analyzing task..."} |
| 9 | |
| 10 | # Plan steps |
| 11 | plan = await self._create_plan(task) |
| 12 | yield {"type": "plan", "steps": plan} |
| 13 | |
| 14 | results = {} |
| 15 | |
| 16 | for i, step in enumerate(plan): |
| 17 | yield {"type": "step_start", "step": i, "description": step["description"]} |
| 18 | |
| 19 | # Generate code for step |
| 20 | async for chunk in self._generate_code_stream(step): |
| 21 | yield {"type": "code_chunk", "content": chunk} |
| 22 | |
| 23 | # Execute with streaming |
| 24 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 25 | |
| 26 | try: |
| 27 | await sandbox.files.write_async("/app/step.py", step["code"]) |
| 28 | |
| 29 | async for line in sandbox.commands.stream_async("python -u /app/step.py"): |
| 30 | yield {"type": "output", "step": i, "line": line} |
| 31 | |
| 32 | # Capture result |
| 33 | result = await sandbox.files.read_async("/app/result.json") |
| 34 | results[step["id"]] = json.loads(result) |
| 35 | |
| 36 | yield {"type": "step_complete", "step": i} |
| 37 | |
| 38 | finally: |
| 39 | await sandbox.kill_async() |
| 40 | |
| 41 | yield {"type": "complete", "results": results} |
| 42 | |
| 43 | async def _generate_code_stream(self, step): |
| 44 | """Stream code generation from LLM""" |
| 45 | response = await self.client.chat.completions.create( |
| 46 | model="gpt-4o", |
| 47 | messages=[{ |
| 48 | "role": "user", |
| 49 | "content": f"Write Python code for: {step['description']}" |
| 50 | }], |
| 51 | stream=True |
| 52 | ) |
| 53 | |
| 54 | async for chunk in response: |
| 55 | content = chunk.choices[0].delta.content or "" |
| 56 | yield content |
| 57 | |
Performance Considerations
Buffer Size
Control output buffer for optimal streaming:
| 1 | # Smaller buffer = more responsive, more overhead |
| 2 | # Larger buffer = less responsive, more efficient |
| 3 | |
| 4 | sandbox.commands.stream("python script.py", buffer_size=64) # Very responsive |
| 5 | sandbox.commands.stream("python script.py", buffer_size=4096) # More efficient |
| 6 | |
Backpressure Handling
Handle slow consumers:
| 1 | import asyncio |
| 2 | from collections import deque |
| 3 | |
| 4 | class BufferedStream: |
| 5 | def __init__(self, max_buffer=100): |
| 6 | self.buffer = deque(maxlen=max_buffer) |
| 7 | self.overflow_count = 0 |
| 8 | |
| 9 | async def produce(self, sandbox): |
| 10 | """Produce output from sandbox""" |
| 11 | async for line in sandbox.commands.stream_async("python script.py"): |
| 12 | if len(self.buffer) >= self.buffer.maxlen: |
| 13 | self.overflow_count += 1 |
| 14 | self.buffer.append(line) |
| 15 | |
| 16 | async def consume(self): |
| 17 | """Consume buffered output""" |
| 18 | while True: |
| 19 | if self.buffer: |
| 20 | yield self.buffer.popleft() |
| 21 | else: |
| 22 | await asyncio.sleep(0.01) |
| 23 | |
Connection Resilience
Handle disconnections gracefully:
| 1 | async def resilient_stream(websocket, sandbox): |
| 2 | """Stream with reconnection support""" |
| 3 | last_position = 0 |
| 4 | output_log = [] |
| 5 | |
| 6 | async for line in sandbox.commands.stream_async("python script.py"): |
| 7 | output_log.append(line) |
| 8 | |
| 9 | try: |
| 10 | await websocket.send_json({ |
| 11 | "position": len(output_log), |
| 12 | "content": line |
| 13 | }) |
| 14 | except ConnectionClosed: |
| 15 | # Client disconnected, keep running |
| 16 | pass |
| 17 | |
| 18 | return output_log |
| 19 | |
| 20 | |
| 21 | async def handle_reconnect(websocket, output_log, from_position): |
| 22 | """Send missed output on reconnection""" |
| 23 | for i, line in enumerate(output_log[from_position:], from_position): |
| 24 | await websocket.send_json({ |
| 25 | "position": i, |
| 26 | "content": line, |
| 27 | "catchup": True |
| 28 | }) |
| 29 | |
Best Practices
1. Always Use Unbuffered Output
| 1 | # Python |
| 2 | sandbox.commands.run("python -u script.py") |
| 3 | |
| 4 | # Node.js |
| 5 | sandbox.commands.run("node --no-warnings script.js") |
| 6 | |
| 7 | # Within Python code |
| 8 | print("message", flush=True) |
| 9 | |
2. Structure Your Output
| 1 | # Don't stream raw debugging |
| 2 | print("x = 5") # Not useful |
| 3 | |
| 4 | # Stream meaningful progress |
| 5 | print(f"[STEP 1/3] Loading data ({len(df)} rows)") |
| 6 | |
3. Handle Errors in Stream
| 1 | async for line in sandbox.commands.stream_async("python script.py"): |
| 2 | if line.startswith("ERROR:"): |
| 3 | yield {"type": "error", "message": line} |
| 4 | break |
| 5 | yield {"type": "output", "content": line} |
| 6 | |
4. Clean Up Resources
| 1 | async def stream_with_cleanup(code): |
| 2 | sandbox = await Sandbox.create_async(template="code-interpreter") |
| 3 | |
| 4 | try: |
| 5 | async for line in sandbox.commands.stream_async(f"python -u -c '{code}'"): |
| 6 | yield line |
| 7 | finally: |
| 8 | await sandbox.kill_async() # Always cleanup |
| 9 | |
Conclusion
Streaming transforms the user experience of AI code execution:
- Immediate feedback instead of waiting
- Progress visibility for long tasks
- Cancellation capability when needed
- Debugging insight in real-time
Implement streaming from day one—your users will thank you.