Back to Blog

Streaming Code Execution: Real-time Output for AI Agents

TutorialsAlex Oprisan9 min read

Streaming Code Execution: Real-time Output for AI Agents

When code runs for more than a second, users start wondering: "Is it working?" Streaming output solves this by showing results as they happen, not just when execution completes.

This guide shows you how to stream code execution from HopX sandboxes, enabling AI agents that feel responsive and keep users engaged.

Why Streaming Matters

Without streaming:

text
1
User: "Analyze this 100MB dataset"
2
[Spinner for 30 seconds]
3
[All output appears at once]
4
 

With streaming:

text
1
User: "Analyze this 100MB dataset"
2
 Loading dataset...
3
 Processing 1,000,000 rows...
4
 Calculating statistics...
5
 Generating visualizations...
6
 Analysis complete!
7
 

Streaming provides:

  • Feedback - Users know the system is working
  • Progress - Long tasks feel faster
  • Debugging - See issues as they occur
  • Cancellation - Stop early if output looks wrong

Basic Streaming

Synchronous Streaming

The simplest form of streaming reads output as it's produced:

python
1
from hopx import Sandbox
2
 
3
sandbox = Sandbox.create(template="code-interpreter")
4
 
5
# Long-running script with progress output
6
code = '''
7
import time
8
 
9
for i in range(10):
10
    print(f"Processing step {i+1}/10...")
11
    time.sleep(1)
12
    
13
print("Complete!")
14
'''
15
 
16
sandbox.files.write("/app/process.py", code)
17
 
18
# Stream output line by line
19
for line in sandbox.commands.stream("cd /app && python -u process.py"):
20
    print(f"[OUTPUT] {line}")
21
 

Note the -u flag for unbuffered Python output—essential for real-time streaming.

Async Streaming

For web applications and async frameworks:

python
1
import asyncio
2
from hopx import Sandbox
3
 
4
async def stream_execution():
5
    sandbox = await Sandbox.create_async(template="code-interpreter")
6
    
7
    code = '''
8
import time
9
import sys
10
 
11
for i in range(5):
12
    print(f"Step {i+1}", flush=True)
13
    time.sleep(0.5)
14
'''
15
    
16
    await sandbox.files.write_async("/app/task.py", code)
17
    
18
    async for line in sandbox.commands.stream_async("python -u /app/task.py"):
19
        print(f"Received: {line}")
20
    
21
    await sandbox.kill_async()
22
 
23
asyncio.run(stream_execution())
24
 

Streaming to Web Clients

Server-Sent Events (SSE)

SSE is perfect for streaming to browsers:

python
1
from fastapi import FastAPI, Response
2
from fastapi.responses import StreamingResponse
3
from hopx import Sandbox
4
import asyncio
5
 
6
app = FastAPI()
7
 
8
@app.post("/execute")
9
async def execute_code(code: str):
10
    async def generate():
11
        sandbox = await Sandbox.create_async(template="code-interpreter")
12
        
13
        try:
14
            await sandbox.files.write_async("/app/code.py", code)
15
            
16
            async for line in sandbox.commands.stream_async("python -u /app/code.py"):
17
                # Format as SSE
18
                yield f"data: {line}\n\n"
19
            
20
            yield "data: [DONE]\n\n"
21
        
22
        finally:
23
            await sandbox.kill_async()
24
    
25
    return StreamingResponse(
26
        generate(),
27
        media_type="text/event-stream",
28
        headers={
29
            "Cache-Control": "no-cache",
30
            "Connection": "keep-alive"
31
        }
32
    )
33
 

Frontend consumption:

javascript
1
const eventSource = new EventSource('/execute', {
2
  method: 'POST',
3
  body: JSON.stringify({ code: 'print("Hello")' })
4
});
5
 
6
eventSource.onmessage = (event) => {
7
  if (event.data === '[DONE]') {
8
    eventSource.close();
9
    return;
10
  }
11
  
12
  // Append to output
13
  document.getElementById('output').textContent += event.data + '\n';
14
};
15
 
16
eventSource.onerror = () => {
17
  console.error('Stream error');
18
  eventSource.close();
19
};
20
 

WebSocket Streaming

For bidirectional communication:

python
1
from fastapi import FastAPI, WebSocket
2
from hopx import Sandbox
3
import asyncio
4
import json
5
 
6
app = FastAPI()
7
 
8
@app.websocket("/ws/execute")
9
async def websocket_execute(websocket: WebSocket):
10
    await websocket.accept()
11
    
12
    sandbox = None
13
    
14
    try:
15
        while True:
16
            # Receive code from client
17
            data = await websocket.receive_json()
18
            
19
            if data["type"] == "execute":
20
                sandbox = await Sandbox.create_async(template="code-interpreter")
21
                await sandbox.files.write_async("/app/code.py", data["code"])
22
                
23
                # Stream output back
24
                async for line in sandbox.commands.stream_async("python -u /app/code.py"):
25
                    await websocket.send_json({
26
                        "type": "output",
27
                        "content": line
28
                    })
29
                
30
                await websocket.send_json({"type": "complete"})
31
                await sandbox.kill_async()
32
                sandbox = None
33
            
34
            elif data["type"] == "cancel":
35
                if sandbox:
36
                    await sandbox.kill_async()
37
                    sandbox = None
38
                await websocket.send_json({"type": "cancelled"})
39
    
40
    except Exception as e:
41
        await websocket.send_json({"type": "error", "message": str(e)})
42
    
43
    finally:
44
        if sandbox:
45
            await sandbox.kill_async()
46
 

Frontend WebSocket client:

javascript
1
const ws = new WebSocket('ws://localhost:8000/ws/execute');
2
let outputDiv = document.getElementById('output');
3
 
4
ws.onopen = () => {
5
  // Send code to execute
6
  ws.send(JSON.stringify({
7
    type: 'execute',
8
    code: `
9
import time
10
for i in range(10):
11
    print(f'Processing {i}...')
12
    time.sleep(0.5)
13
`
14
  }));
15
};
16
 
17
ws.onmessage = (event) => {
18
  const data = JSON.parse(event.data);
19
  
20
  switch (data.type) {
21
    case 'output':
22
      outputDiv.textContent += data.content + '\n';
23
      break;
24
    case 'complete':
25
      outputDiv.textContent += '\n[Execution complete]';
26
      break;
27
    case 'error':
28
      outputDiv.textContent += `\nError: ${data.message}`;
29
      break;
30
  }
31
};
32
 
33
// Cancel button
34
document.getElementById('cancel').onclick = () => {
35
  ws.send(JSON.stringify({ type: 'cancel' }));
36
};
37
 

Progress Reporting

Structured Progress Updates

Instead of raw output, send structured progress:

python
1
from hopx import Sandbox
2
import json
3
 
4
def execute_with_progress(code: str, on_progress):
5
    """Execute code and report structured progress"""
6
    sandbox = Sandbox.create(template="code-interpreter")
7
    
8
    # Wrapper that outputs JSON progress
9
    wrapper = '''
10
import sys
11
import json
12
 
13
def progress(message, percent=None, data=None):
14
    """Report progress in structured format"""
15
    update = {"type": "progress", "message": message}
16
    if percent is not None:
17
        update["percent"] = percent
18
    if data is not None:
19
        update["data"] = data
20
    print("__PROGRESS__" + json.dumps(update), flush=True)
21
 
22
def result(data):
23
    """Report final result"""
24
    print("__RESULT__" + json.dumps({"type": "result", "data": data}), flush=True)
25
 
26
def error(message):
27
    """Report error"""
28
    print("__ERROR__" + json.dumps({"type": "error", "message": message}), flush=True)
29
 
30
# User code can now use progress(), result(), error()
31
'''
32
    
33
    full_code = wrapper + "\n" + code
34
    sandbox.files.write("/app/task.py", full_code)
35
    
36
    for line in sandbox.commands.stream("python -u /app/task.py"):
37
        if line.startswith("__PROGRESS__"):
38
            data = json.loads(line[12:])
39
            on_progress(data)
40
        elif line.startswith("__RESULT__"):
41
            data = json.loads(line[10:])
42
            on_progress(data)
43
        elif line.startswith("__ERROR__"):
44
            data = json.loads(line[9:])
45
            on_progress(data)
46
        else:
47
            # Regular output
48
            on_progress({"type": "output", "content": line})
49
    
50
    sandbox.kill()
51
 
52
 
53
# Usage
54
def handle_progress(update):
55
    if update["type"] == "progress":
56
        print(f"[{update.get('percent', '?')}%] {update['message']}")
57
    elif update["type"] == "result":
58
        print(f"Result: {update['data']}")
59
 
60
code = '''
61
import time
62
 
63
progress("Starting analysis", 0)
64
 
65
for i in range(5):
66
    progress(f"Processing batch {i+1}", (i+1) * 20)
67
    time.sleep(0.5)
68
 
69
result({"processed": 5, "status": "success"})
70
'''
71
 
72
execute_with_progress(code, handle_progress)
73
 

Progress Bar Integration

Send progress suitable for UI progress bars:

python
1
analysis_code = '''
2
import pandas as pd
3
import time
4
 
5
# Load data
6
progress("Loading dataset...", 0)
7
df = pd.read_csv('/app/data.csv')
8
progress(f"Loaded {len(df)} rows", 20)
9
 
10
# Clean data
11
progress("Cleaning data...", 20)
12
df = df.dropna()
13
progress(f"Cleaned, {len(df)} rows remaining", 40)
14
 
15
# Process
16
total = len(df)
17
for i, chunk in enumerate(range(0, total, 1000)):
18
    percent = 40 + (i / (total // 1000)) * 40
19
    progress(f"Processing rows {chunk}-{chunk+1000}", percent)
20
    time.sleep(0.1)  # Simulate work
21
 
22
# Generate report
23
progress("Generating report...", 80)
24
summary = df.describe()
25
progress("Report ready", 100)
26
 
27
result(summary.to_dict())
28
'''
29
 

Handling Long-Running Tasks

Heartbeat Pattern

Keep connections alive during long operations:

python
1
import asyncio
2
from hopx import Sandbox
3
 
4
async def execute_with_heartbeat(code: str):
5
    sandbox = await Sandbox.create_async(template="code-interpreter")
6
    await sandbox.files.write_async("/app/task.py", code)
7
    
8
    # Start execution
9
    execution = asyncio.create_task(
10
        collect_output(sandbox.commands.stream_async("python -u /app/task.py"))
11
    )
12
    
13
    # Send heartbeats while executing
14
    while not execution.done():
15
        yield {"type": "heartbeat", "timestamp": time.time()}
16
        await asyncio.sleep(5)
17
    
18
    # Get final result
19
    output = await execution
20
    yield {"type": "complete", "output": output}
21
    
22
    await sandbox.kill_async()
23
 
24
 
25
async def collect_output(stream):
26
    output = []
27
    async for line in stream:
28
        output.append(line)
29
    return output
30
 

Timeout with Partial Results

Return what you have if execution takes too long:

python
1
import asyncio
2
from hopx import Sandbox
3
 
4
async def execute_with_timeout(code: str, timeout: int = 60):
5
    sandbox = await Sandbox.create_async(template="code-interpreter")
6
    await sandbox.files.write_async("/app/task.py", code)
7
    
8
    output = []
9
    
10
    try:
11
        async with asyncio.timeout(timeout):
12
            async for line in sandbox.commands.stream_async("python -u /app/task.py"):
13
                output.append(line)
14
                yield {"type": "output", "line": line}
15
        
16
        yield {"type": "complete", "output": output}
17
    
18
    except asyncio.TimeoutError:
19
        yield {
20
            "type": "timeout",
21
            "partial_output": output,
22
            "message": f"Execution exceeded {timeout}s limit"
23
        }
24
    
25
    finally:
26
        await sandbox.kill_async()
27
 

Streaming for AI Agents

Integration with LLM Streaming

Combine code streaming with LLM response streaming:

python
1
import openai
2
from hopx import Sandbox
3
 
4
async def agent_stream(user_message: str):
5
    client = openai.AsyncOpenAI()
6
    
7
    # Stream LLM response
8
    response = await client.chat.completions.create(
9
        model="gpt-4o",
10
        messages=[
11
            {"role": "system", "content": "You are a helpful coding assistant."},
12
            {"role": "user", "content": user_message}
13
        ],
14
        stream=True
15
    )
16
    
17
    code_block = ""
18
    in_code_block = False
19
    
20
    async for chunk in response:
21
        content = chunk.choices[0].delta.content or ""
22
        
23
        # Detect code blocks
24
        if "```python" in content:
25
            in_code_block = True
26
            yield {"type": "text", "content": content}
27
            continue
28
        
29
        if "```" in content and in_code_block:
30
            in_code_block = False
31
            
32
            # Execute the collected code
33
            yield {"type": "text", "content": content}
34
            yield {"type": "executing", "code": code_block}
35
            
36
            async for output in execute_code_stream(code_block):
37
                yield {"type": "execution", "output": output}
38
            
39
            code_block = ""
40
            continue
41
        
42
        if in_code_block:
43
            code_block += content
44
        
45
        yield {"type": "text", "content": content}
46
 
47
 
48
async def execute_code_stream(code: str):
49
    sandbox = await Sandbox.create_async(template="code-interpreter")
50
    
51
    try:
52
        await sandbox.files.write_async("/app/code.py", code)
53
        
54
        async for line in sandbox.commands.stream_async("python -u /app/code.py"):
55
            yield line
56
    
57
    finally:
58
        await sandbox.kill_async()
59
 

Multi-Step Agent with Streaming

python
1
class StreamingAgent:
2
    def __init__(self):
3
        self.client = openai.AsyncOpenAI()
4
    
5
    async def run(self, task: str):
6
        """Run agent with streaming at every step"""
7
        
8
        yield {"type": "thinking", "message": "Analyzing task..."}
9
        
10
        # Plan steps
11
        plan = await self._create_plan(task)
12
        yield {"type": "plan", "steps": plan}
13
        
14
        results = {}
15
        
16
        for i, step in enumerate(plan):
17
            yield {"type": "step_start", "step": i, "description": step["description"]}
18
            
19
            # Generate code for step
20
            async for chunk in self._generate_code_stream(step):
21
                yield {"type": "code_chunk", "content": chunk}
22
            
23
            # Execute with streaming
24
            sandbox = await Sandbox.create_async(template="code-interpreter")
25
            
26
            try:
27
                await sandbox.files.write_async("/app/step.py", step["code"])
28
                
29
                async for line in sandbox.commands.stream_async("python -u /app/step.py"):
30
                    yield {"type": "output", "step": i, "line": line}
31
                
32
                # Capture result
33
                result = await sandbox.files.read_async("/app/result.json")
34
                results[step["id"]] = json.loads(result)
35
                
36
                yield {"type": "step_complete", "step": i}
37
            
38
            finally:
39
                await sandbox.kill_async()
40
        
41
        yield {"type": "complete", "results": results}
42
    
43
    async def _generate_code_stream(self, step):
44
        """Stream code generation from LLM"""
45
        response = await self.client.chat.completions.create(
46
            model="gpt-4o",
47
            messages=[{
48
                "role": "user",
49
                "content": f"Write Python code for: {step['description']}"
50
            }],
51
            stream=True
52
        )
53
        
54
        async for chunk in response:
55
            content = chunk.choices[0].delta.content or ""
56
            yield content
57
 

Performance Considerations

Buffer Size

Control output buffer for optimal streaming:

python
1
# Smaller buffer = more responsive, more overhead
2
# Larger buffer = less responsive, more efficient
3
 
4
sandbox.commands.stream("python script.py", buffer_size=64)  # Very responsive
5
sandbox.commands.stream("python script.py", buffer_size=4096)  # More efficient
6
 

Backpressure Handling

Handle slow consumers:

python
1
import asyncio
2
from collections import deque
3
 
4
class BufferedStream:
5
    def __init__(self, max_buffer=100):
6
        self.buffer = deque(maxlen=max_buffer)
7
        self.overflow_count = 0
8
    
9
    async def produce(self, sandbox):
10
        """Produce output from sandbox"""
11
        async for line in sandbox.commands.stream_async("python script.py"):
12
            if len(self.buffer) >= self.buffer.maxlen:
13
                self.overflow_count += 1
14
            self.buffer.append(line)
15
    
16
    async def consume(self):
17
        """Consume buffered output"""
18
        while True:
19
            if self.buffer:
20
                yield self.buffer.popleft()
21
            else:
22
                await asyncio.sleep(0.01)
23
 

Connection Resilience

Handle disconnections gracefully:

python
1
async def resilient_stream(websocket, sandbox):
2
    """Stream with reconnection support"""
3
    last_position = 0
4
    output_log = []
5
    
6
    async for line in sandbox.commands.stream_async("python script.py"):
7
        output_log.append(line)
8
        
9
        try:
10
            await websocket.send_json({
11
                "position": len(output_log),
12
                "content": line
13
            })
14
        except ConnectionClosed:
15
            # Client disconnected, keep running
16
            pass
17
    
18
    return output_log
19
 
20
 
21
async def handle_reconnect(websocket, output_log, from_position):
22
    """Send missed output on reconnection"""
23
    for i, line in enumerate(output_log[from_position:], from_position):
24
        await websocket.send_json({
25
            "position": i,
26
            "content": line,
27
            "catchup": True
28
        })
29
 

Best Practices

1. Always Use Unbuffered Output

python
1
# Python
2
sandbox.commands.run("python -u script.py")
3
 
4
# Node.js
5
sandbox.commands.run("node --no-warnings script.js")
6
 
7
# Within Python code
8
print("message", flush=True)
9
 

2. Structure Your Output

python
1
# Don't stream raw debugging
2
print("x = 5")  # Not useful
3
 
4
# Stream meaningful progress
5
print(f"[STEP 1/3] Loading data ({len(df)} rows)")
6
 

3. Handle Errors in Stream

python
1
async for line in sandbox.commands.stream_async("python script.py"):
2
    if line.startswith("ERROR:"):
3
        yield {"type": "error", "message": line}
4
        break
5
    yield {"type": "output", "content": line}
6
 

4. Clean Up Resources

python
1
async def stream_with_cleanup(code):
2
    sandbox = await Sandbox.create_async(template="code-interpreter")
3
    
4
    try:
5
        async for line in sandbox.commands.stream_async(f"python -u -c '{code}'"):
6
            yield line
7
    finally:
8
        await sandbox.kill_async()  # Always cleanup
9
 

Conclusion

Streaming transforms the user experience of AI code execution:

  • Immediate feedback instead of waiting
  • Progress visibility for long tasks
  • Cancellation capability when needed
  • Debugging insight in real-time

Implement streaming from day one—your users will thank you.

Resources