This tutorial assumes you have an existing conversation ID from a previous Thesis.io session. If you need to create a new conversation first, check out our Quickstart guide.
Prerequisites
Before starting, make sure you have:- For Python: Python 3.7+ installed with async programming knowledge
- For TypeScript: Node.js 16+ and TypeScript experience
- A valid Thesis.io Space API key
- An existing conversation ID from a previous session
- Understanding of streaming and event-driven programming
Get your Thesis.io API key
Required Dependencies
- Python
- TypeScript
Install the required Python packages:
Copy
pip install httpx asyncio python-dotenv
Install the required Node.js packages:
Copy
# TypeScript and Node.js types
npm install -D typescript @types/node vite-node
# No additional HTTP client needed - uses native Node.js fetch
The TypeScript client uses Node.js built-in
fetch API (available in Node.js 18+) and URL class, so no additional HTTP dependencies like axios are required.Environment Setup
Create a.env file in your project root:
Copy
API_KEY=your_thesis_api_key_here
API_BASE_URL=http://localhost:3000
The TypeScript client defaults to
http://localhost:3000 for local development. You can override this by setting the API_BASE_URL environment variable or modifying the base URL in your code.Building the Streaming Client
Let’s break down the streaming client into digestible components for both Python and TypeScript:1. Core Client Class
- Python
- TypeScript
First, we’ll create the main streaming client class that handles HTTP connections:
Copy
import asyncio
import json
import os
import httpx
class StreamingClient:
def __init__(self, base_url: str = 'https://app-be.thesis.io'):
self.base_url = base_url
self.client = None
async def __aenter__(self):
self.client = httpx.AsyncClient(timeout=300.0) # 5 minute timeout
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.aclose()
The TypeScript client uses native Node.js APIs for HTTP streaming:
Copy
import { URL } from "url";
class StreamingClient {
private base_url: string;
constructor(base_url: string = "http://localhost:3000") {
this.base_url = base_url;
}
// Event handler for processing different stream events
private _handle_event(event: any): void {
// Handle different types of events from the stream
const event_type = event.type || "unknown";
// Event processing logic will be shown in next section
}
// Main streaming method
public async stream_conversation(
conversation_id: string,
api_key: string,
system_prompt: string = "",
user_prompt: string = "",
research_mode: string = "deep_research"
): Promise<void> {
// Streaming implementation will be shown in detail below
}
}
The TypeScript implementation uses Node.js built-in
fetch API (Node.js 18+) and URL class. No external HTTP libraries are required, making it lightweight and dependency-free.The Python version uses async context managers for resource management, while the TypeScript version relies on Node.js’s built-in memory management for HTTP connections.
2. Event Handler System
The streaming API returns various types of events. Here’s how to handle them:- Python Event Handler
- TypeScript Event Handler
- Event Types Explained
Copy
async def _handle_event(self, event: dict):
"""Handle different types of events from the stream"""
event_type = event.get('type', 'unknown')
if event_type == 'connection':
status = event.get('status', '')
message = event.get('message', '')
if status == 'connected':
print(f'🔗 {message}')
elif status == 'disconnected':
print(f'🔌 {message}')
elif event_type == 'oh_event':
# Main socket.io event data
data = event.get('data', {})
print('\n📨 Socket Event:')
print(f" Type: {data.get('type', 'N/A')}")
print(f" Source: {data.get('source', 'N/A')}")
# Display content based on what's available
if 'content' in data:
print(f" Content: {data['content']}")
if 'message' in data:
print(f" Message: {data['message']}")
if 'observation' in data:
print(f" Observation: {data['observation']}")
# Check for agent state changes
if 'extras' in data and data['extras']:
extras = data['extras']
if 'agent_state' in extras:
print(f" Agent State: {extras['agent_state']}")
if extras['agent_state'] == 'awaiting_user_input':
print(' 🏁 Agent is now awaiting user input - conversation completed!')
print(f' Full Data: {json.dumps(data, indent=2)}')
print('-' * 30)
elif event_type == 'error':
error_type = event.get('error', 'Unknown')
message = event.get('message', '')
print(f'\n❌ Error ({error_type}): {message}')
elif event_type == 'completion':
reason = event.get('reason', 'unknown')
status = event.get('status', 'finished')
print(f'\n🏁 Completion: {status} (reason: {reason})')
else:
print(f'\n❓ Unknown event type: {event_type}')
print(f' Full Event: {json.dumps(event, indent=2)}')
The TypeScript client includes comprehensive event handling:
Copy
private _handle_event(event: any): void {
/** Handle different types of events from the stream */
const event_type = event.type || "unknown";
if (event_type === "connection") {
const status = event.status || "";
const message = event.message || "";
if (status === "connected") {
console.log(`🔗 ${message}`);
} else if (status === "disconnected") {
console.log(`🔌 ${message}`);
}
} else if (event_type === "oh_event") {
// This is the main socket.io event data
const data = event.data || {};
console.log("\n📨 Socket Event:");
console.log(` Type: ${data.type || "N/A"}`);
console.log(` Source: ${data.source || "N/A"}`);
if ("content" in data) {
console.log(` Content: ${data.content}`);
}
if ("message" in data) {
console.log(` Message: ${data.message}`);
}
if ("observation" in data) {
console.log(` Observation: ${data.observation}`);
}
if ("extras" in data && data.extras) {
const extras = data.extras;
if ("agent_state" in extras) {
console.log(` Agent State: ${extras.agent_state}`);
if (extras.agent_state === "awaiting_user_input") {
console.log(
" 🏁 Agent is now awaiting user input - conversation completed!"
);
}
}
}
// Print full event data for debugging
console.log(` Full Data: ${JSON.stringify(data, null, 2)}`);
console.log("-".repeat(30));
} else if (event_type === "error") {
const error_type = event.error || "Unknown";
const message = event.message || "";
console.log(`\n❌ Error (${error_type}): ${message}`);
} else if (event_type === "completion") {
const reason = event.reason || "unknown";
const status = event.status || "finished";
console.log(`\n🏁 Completion: ${status} (reason: ${reason})`);
} else {
console.log(`\n❓ Unknown event type: ${event_type}`);
console.log(` Full Event: ${JSON.stringify(event, null, 2)}`);
}
}
The TypeScript event handler follows the same logic as the Python version, processing connection events, socket.io events, errors, and completion signals with detailed logging.
The streaming API sends several types of events:Connection Events
status: 'connected'- Successfully connected to the streamstatus: 'disconnected'- Connection lost or closed
oh_event)- Main conversation data and AI responses
- Contains
type,source,content,message,observationdepending on the message types. A comprehensive event list will be added in the future. - Includes agent state changes in
extras.agent_state, also depending on the message types.
- Signals when the conversation ends
- Includes completion reason and status
- Connection errors, authentication failures
- Includes error type and descriptive message
3. Streaming Connection Logic
The core streaming functionality handles chunked JSON responses. The TypeScript implementation uses Node.js’s built-infetch API:
- Python Streaming
- TypeScript Streaming
- How It Works
Copy
async def stream_conversation(
self,
conversation_id: str,
api_key: str,
system_prompt: str = '',
user_prompt: str = '',
research_mode: str = 'deep_research',
):
"""Stream conversation responses from the API endpoint"""
params = {
'conversation_id': conversation_id,
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'research_mode': research_mode,
}
endpoint = f'{self.base_url}/api/v1/integration/conversations/join-conversation'
print(f'🔗 Connecting to: {endpoint}')
print(f'📋 Parameters: {params}')
print('=' * 50)
try:
async with self.client.stream(
'POST',
endpoint,
json=params,
headers={'Authorization': f'Bearer {api_key}'},
) as response:
print(f'✅ Response Status: {response.status_code}')
if response.status_code != 200:
error_text = await response.aread()
print(f'❌ Error: {error_text.decode()}')
return
print('🔄 Streaming events:')
print('-' * 50)
# Buffer to handle chunked JSON
buffer = ''
async for chunk in response.aiter_text():
buffer += chunk
# Process complete JSON objects from buffer
while buffer:
try:
decoder = json.JSONDecoder()
event, idx = decoder.raw_decode(buffer)
await self._handle_event(event)
# Remove processed JSON from buffer
buffer = buffer[idx:].lstrip()
# Check for completion or error
if event.get('type') in ['completion', 'error']:
return
except json.JSONDecodeError:
# Incomplete JSON, wait for more data
break
# Handle any remaining buffer content
if buffer.strip():
print(f'⚠️ Unparsed buffer content: {buffer}')
except httpx.ConnectError:
print(f'❌ Failed to connect to {self.base_url}')
print('Make sure your API endpoint is accessible!')
except httpx.TimeoutException:
print('⏰ Request timed out')
except Exception as e:
print(f'❌ Unexpected error: {e}')
The TypeScript client uses Node.js’s built-in
fetch API for streaming:Copy
public async stream_conversation(
conversation_id: string,
api_key: string,
system_prompt: string = "",
user_prompt: string = "",
research_mode: string = "deep_research"
): Promise<void> {
/**
* Stream conversation responses from the FastAPI endpoint
*/
const params = {
conversation_id,
system_prompt,
user_prompt,
research_mode,
};
const endpoint = new URL(
"/api/v1/integration/conversations/join-conversation",
this.base_url
).toString();
console.log(`🔗 Connecting to: ${endpoint}`);
console.log(`📋 Parameters: ${JSON.stringify(params)}`);
console.log("=".repeat(50));
try {
const response = await fetch(endpoint, {
method: "POST",
body: JSON.stringify(params),
headers: {
Authorization: `Bearer ${api_key}`,
"Content-Type": "application/json",
},
});
console.log(`✅ Response Status: ${response.status}`);
if (!response.ok) {
const error_text = await response.text();
console.log(`❌ Error: ${error_text}`);
return;
}
console.log("🔄 Streaming events:");
console.log("-".repeat(50));
// Buffer to handle chunked JSON
let buffer = "";
if (!response.body) {
throw new Error("No response body");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete JSON objects from buffer
while (buffer.trim()) {
try {
const event = JSON.parse(buffer);
// Successfully parsed a JSON object
this._handle_event(event);
// Reset buffer since we parsed the whole thing
// Note: Assuming one JSON per chunk accumulation; adjust if multi-objects
buffer = "";
// Check for completion
if (event.type === "completion") {
const status = event.status || "finished";
if (status === "cancelled") {
console.log(
`\n🚫 Stream cancelled: ${
event.message || "Stream was cancelled"
}`
);
} else if (status === "finished") {
console.log(
`\n✅ Stream completed successfully with message: ${
event.message || "Unknown message"
}`
);
} else {
console.log(
`\n🏁 Stream ended with status '${status}': ${
event.message || "No message"
}`
);
}
return;
} else if (event.type === "error") {
console.log(
`\n❌ Stream ended with error: ${
event.message || "Unknown error"
}`
);
return;
}
} catch (err) {
if (err instanceof SyntaxError) {
// Incomplete JSON in buffer, wait for more data
break;
} else {
throw err;
}
}
}
}
// Handle any remaining buffer content
if (buffer.trim()) {
console.log(`⚠️ Unparsed buffer content: ${buffer}`);
}
} catch (err: any) {
if (err.name === "FetchError" && err.message.includes("connect")) {
console.log(`❌ Failed to connect to ${this.base_url}`);
console.log("Make sure your FastAPI server is running!");
} else if (err.name === "AbortError") {
console.log("⏰ Request timed out");
} else {
console.log(`❌ Unexpected error: ${err.message}`);
}
}
}
The TypeScript implementation uses the native
fetch API with a ReadableStream reader to handle streaming responses. This approach is more modern and doesn’t require external dependencies.Key Components:
-
Request Setup
- Constructs the API endpoint URL using Node.js
URLclass - Prepares conversation parameters
- Sets authentication headers
- Constructs the API endpoint URL using Node.js
-
Streaming Response
- Python: Uses
httpx.stream()for long-running connections - TypeScript: Uses native
fetch()withReadableStreamprocessing - Handles HTTP errors and timeouts gracefully
- Python: Uses
-
JSON Buffer Processing
- Accumulates text chunks in a buffer
- Parses complete JSON objects as they arrive
- Handles split JSON across multiple chunks
- Python: Uses
json.JSONDecoder().raw_decode()for parsing - TypeScript: Uses
JSON.parse()with SyntaxError catching for incomplete JSON
-
Event Processing
- Both delegate each event to their respective
_handle_event()method - Monitors for completion/error events to end streaming
- Provides detailed logging of all event types
- Both delegate each event to their respective
-
Error Handling
- Connection failures, fetch errors, timeouts, and unexpected errors
- Provides helpful debug information
- TypeScript: Handles
FetchErrorandAbortErrorspecifically
The JSON buffer handling is crucial because streaming responses can arrive in chunks, potentially splitting JSON objects across multiple chunks. The TypeScript version assumes one JSON object per accumulated buffer for simplicity.
4. Main Application Logic
Here’s how to tie everything together:- Python Main
- TypeScript Main
Copy
async def main():
"""Main function to run the streaming client"""
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Configuration - modify these values for your use case
config = {
'conversation_id': '4b03707134ee42b4abf613353f746b6c', # Replace with your conversation ID
'api_key': os.getenv('API_KEY'),
'system_prompt': 'You are a helpful AI assistant specialized in DeFi research and analysis.',
'user_prompt': 'What are the latest developments in yield farming protocols?',
'research_mode': 'deep_research',
}
if not config['api_key']:
print('❌ Error: API_KEY not found in environment variables')
return
print('🚀 Starting Thesis.io Streaming Client')
async with StreamingClient(os.getenv('API_BASE_URL', 'https://app-be.thesis.io')) as client:
await client.stream_conversation(**config)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\n⏹️ Client stopped by user')
except Exception as e:
print(f'\n💥 Client error: {e}')
The main function and execution logic:
Copy
async function main(): Promise<void> {
/**
* Main function to run the streaming client
*/
// Example configuration - modify these values
const config = {
conversation_id: "4b03707134ee42b4abf613353f746b6c",
api_key: process.env.API_KEY || "",
system_prompt:
"You are a helpful AI assistant specialized in software development. You are also a joke teller.",
user_prompt: "Tell me a joke.",
research_mode: "deep_research",
};
// FastAPI server URL
console.log("🚀 Starting FastAPI Streaming Client");
const client = new StreamingClient(
process.env.API_BASE_URL || "http://localhost:3000"
);
await client.stream_conversation(
config.conversation_id,
config.api_key,
config.system_prompt,
config.user_prompt,
config.research_mode
);
}
function print_usage(): void {
/** Print usage information */
console.log(`
FastAPI Streaming Client
This client connects to your FastAPI streaming endpoint and displays
real-time responses from the socket.io conversation.
Before running:
1. Start your socket.io server (usually on port 3000)
2. Start your FastAPI server with the streaming endpoint (usually on port 8000)
3. Update the configuration in this script with your actual values
Usage:
node fastapi_streaming_client.js
`);
}
if (
process.argv.length > 2 &&
["-h", "--help", "help"].includes(process.argv[2])
) {
print_usage();
} else {
main().catch((e) => {
console.log(`\n💥 Client error: ${e.message}`);
});
}
The TypeScript main function includes detailed configuration setup, usage information, and graceful error handling. Note that it defaults to
http://localhost:3000 for local development environments.Complete Implementation
- Complete Python Code
- Complete TypeScript Code
- Configuration Options
Here’s the complete implementation you can copy and use:
Copy
#!/usr/bin/env python3
"""
Thesis.io Streaming Client - Join existing conversations for DeFi research
"""
import asyncio
import json
import os
import sys
import httpx
from dotenv import load_dotenv
class StreamingClient:
def __init__(self, base_url: str = 'https://app-be.thesis.io'):
self.base_url = base_url
self.client = None
async def __aenter__(self):
self.client = httpx.AsyncClient(timeout=300.0)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.aclose()
async def _handle_event(self, event: dict):
"""Handle different types of events from the stream"""
event_type = event.get('type', 'unknown')
if event_type == 'connection':
status = event.get('status', '')
message = event.get('message', '')
if status == 'connected':
print(f'🔗 {message}')
elif status == 'disconnected':
print(f'🔌 {message}')
elif event_type == 'oh_event':
data = event.get('data', {})
print('\n📨 Socket Event:')
print(f" Type: {data.get('type', 'N/A')}")
print(f" Source: {data.get('source', 'N/A')}")
if 'content' in data:
print(f" Content: {data['content']}")
if 'message' in data:
print(f" Message: {data['message']}")
if 'observation' in data:
print(f" Observation: {data['observation']}")
if 'extras' in data and data['extras']:
extras = data['extras']
if 'agent_state' in extras:
print(f" Agent State: {extras['agent_state']}")
if extras['agent_state'] == 'awaiting_user_input':
print(' 🏁 Agent is now awaiting user input - conversation completed!')
print(f' Full Data: {json.dumps(data, indent=2)}')
print('-' * 30)
elif event_type == 'error':
error_type = event.get('error', 'Unknown')
message = event.get('message', '')
print(f'\n❌ Error ({error_type}): {message}')
elif event_type == 'completion':
reason = event.get('reason', 'unknown')
status = event.get('status', 'finished')
print(f'\n🏁 Completion: {status} (reason: {reason})')
else:
print(f'\n❓ Unknown event type: {event_type}')
print(f' Full Event: {json.dumps(event, indent=2)}')
async def stream_conversation(
self,
conversation_id: str,
api_key: str,
system_prompt: str = '',
user_prompt: str = '',
research_mode: str = 'deep_research',
):
"""Stream conversation responses from the API endpoint"""
params = {
'conversation_id': conversation_id,
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'research_mode': research_mode,
}
endpoint = f'{self.base_url}/api/v1/integration/conversations/join-conversation'
print(f'🔗 Connecting to: {endpoint}')
print(f'📋 Parameters: {params}')
print('=' * 50)
try:
async with self.client.stream(
'POST',
endpoint,
json=params,
headers={'Authorization': f'Bearer {api_key}'},
) as response:
print(f'✅ Response Status: {response.status_code}')
if response.status_code != 200:
error_text = await response.aread()
print(f'❌ Error: {error_text.decode()}')
return
print('🔄 Streaming events:')
print('-' * 50)
buffer = ''
async for chunk in response.aiter_text():
buffer += chunk
while buffer:
try:
decoder = json.JSONDecoder()
event, idx = decoder.raw_decode(buffer)
await self._handle_event(event)
buffer = buffer[idx:].lstrip()
if event.get('type') == 'completion':
status = event.get('status', 'finished')
if status == 'cancelled':
print(f"\n🚫 Stream cancelled: {event.get('message', 'Stream was cancelled')}")
elif status == 'finished':
print(f"\n✅ Stream completed successfully with message: {event.get('message', 'Unknown message')}")
else:
print(f"\n🏁 Stream ended with status '{status}': {event.get('message', 'No message')}")
return
elif event.get('type') == 'error':
print(f"\n❌ Stream ended with error: {event.get('message', 'Unknown error')}")
return
except json.JSONDecodeError:
break
if buffer.strip():
print(f'⚠️ Unparsed buffer content: {buffer}')
except httpx.ConnectError:
print(f'❌ Failed to connect to {self.base_url}')
print('Make sure your API endpoint is accessible!')
except httpx.TimeoutException:
print('⏰ Request timed out')
except Exception as e:
print(f'❌ Unexpected error: {e}')
async def main():
"""Main function to run the streaming client"""
load_dotenv()
# Example configuration - modify these values
config = {
'conversation_id': '4b03707134ee42b4abf613353f746b6c', # Replace with your conversation ID
'api_key': os.getenv('API_KEY'),
'system_prompt': 'You are a helpful AI assistant specialized in DeFi research and analysis.',
'user_prompt': 'What are the latest developments in yield farming protocols?',
'research_mode': 'deep_research',
}
if not config['api_key']:
print('❌ Error: API_KEY not found in environment variables')
return
print('🚀 Starting Thesis.io Streaming Client')
async with StreamingClient(os.getenv('API_BASE_URL', 'https://app-be.thesis.io')) as client:
await client.stream_conversation(**config)
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] in ['-h', '--help', 'help']:
print("""
Thesis.io Streaming Client
This client connects to existing Thesis.io conversations for continued DeFi research.
Before running:
1. Set your API_KEY in the .env file
2. Set your API_BASE_URL (optional, defaults to https://app-be.thesis.io)
3. Update the conversation_id in the script with your actual conversation ID
Usage:
python join_conversation.py
""")
else:
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\n⏹️ Client stopped by user')
except Exception as e:
print(f'\n💥 Client error: {e}')
Here’s the complete TypeScript implementation that uses the exact code you provided:
Copy
/**
* Simple client that calls the /join-conversation-stream endpoint
* and prints responses from the server in real-time.
*/
import { URL } from "url";
class StreamingClient {
private base_url: string;
constructor(base_url: string = "http://localhost:3000") {
this.base_url = base_url;
}
private _handle_event(event: any): void {
/** Handle different types of events from the stream */
const event_type = event.type || "unknown";
if (event_type === "connection") {
const status = event.status || "";
const message = event.message || "";
if (status === "connected") {
console.log(`🔗 ${message}`);
} else if (status === "disconnected") {
console.log(`🔌 ${message}`);
}
} else if (event_type === "oh_event") {
// This is the main socket.io event data
const data = event.data || {};
console.log("\n📨 Socket Event:");
console.log(` Type: ${data.type || "N/A"}`);
console.log(` Source: ${data.source || "N/A"}`);
if ("content" in data) {
console.log(` Content: ${data.content}`);
}
if ("message" in data) {
console.log(` Message: ${data.message}`);
}
if ("observation" in data) {
console.log(` Observation: ${data.observation}`);
}
if ("extras" in data && data.extras) {
const extras = data.extras;
if ("agent_state" in extras) {
console.log(` Agent State: ${extras.agent_state}`);
if (extras.agent_state === "awaiting_user_input") {
console.log(
" 🏁 Agent is now awaiting user input - conversation completed!"
);
}
}
}
// Print full event data for debugging
console.log(` Full Data: ${JSON.stringify(data, null, 2)}`);
console.log("-".repeat(30));
} else if (event_type === "error") {
const error_type = event.error || "Unknown";
const message = event.message || "";
console.log(`\n❌ Error (${error_type}): ${message}`);
} else if (event_type === "completion") {
const reason = event.reason || "unknown";
const status = event.status || "finished";
console.log(`\n🏁 Completion: ${status} (reason: ${reason})`);
} else {
console.log(`\n❓ Unknown event type: ${event_type}`);
console.log(` Full Event: ${JSON.stringify(event, null, 2)}`);
}
}
public async stream_conversation(
conversation_id: string,
api_key: string,
system_prompt: string = "",
user_prompt: string = "",
research_mode: string = "deep_research"
): Promise<void> {
/**
* Stream conversation responses from the FastAPI endpoint
*/
const params = {
conversation_id,
system_prompt,
user_prompt,
research_mode,
};
const endpoint = new URL(
"/api/v1/integration/conversations/join-conversation",
this.base_url
).toString();
console.log(`🔗 Connecting to: ${endpoint}`);
console.log(`📋 Parameters: ${JSON.stringify(params)}`);
console.log("=".repeat(50));
try {
const response = await fetch(endpoint, {
method: "POST",
body: JSON.stringify(params),
headers: {
Authorization: `Bearer ${api_key}`,
"Content-Type": "application/json",
},
});
console.log(`✅ Response Status: ${response.status}`);
if (!response.ok) {
const error_text = await response.text();
console.log(`❌ Error: ${error_text}`);
return;
}
console.log("🔄 Streaming events:");
console.log("-".repeat(50));
// Buffer to handle chunked JSON
let buffer = "";
if (!response.body) {
throw new Error("No response body");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete JSON objects from buffer
while (buffer.trim()) {
try {
const event = JSON.parse(buffer);
// Successfully parsed a JSON object
this._handle_event(event);
// Reset buffer since we parsed the whole thing
// Note: Assuming one JSON per chunk accumulation; adjust if multi-objects
buffer = "";
// Check for completion
if (event.type === "completion") {
const status = event.status || "finished";
if (status === "cancelled") {
console.log(
`\n🚫 Stream cancelled: ${
event.message || "Stream was cancelled"
}`
);
} else if (status === "finished") {
console.log(
`\n✅ Stream completed successfully with message: ${
event.message || "Unknown message"
}`
);
} else {
console.log(
`\n🏁 Stream ended with status '${status}': ${
event.message || "No message"
}`
);
}
return;
} else if (event.type === "error") {
console.log(
`\n❌ Stream ended with error: ${
event.message || "Unknown error"
}`
);
return;
}
} catch (err: any) {
if (err instanceof SyntaxError) {
// Incomplete JSON in buffer, wait for more data
break;
} else {
throw err;
}
}
}
}
// Handle any remaining buffer content
if (buffer.trim()) {
console.log(`⚠️ Unparsed buffer content: ${buffer}`);
}
} catch (err: any) {
if (err.name === "FetchError" && err.message.includes("connect")) {
console.log(`❌ Failed to connect to ${this.base_url}`);
console.log("Make sure your FastAPI server is running!");
} else if (err.name === "AbortError") {
console.log("⏰ Request timed out");
} else {
console.log(`❌ Unexpected error: ${err.message}`);
}
}
}
}
async function main(): Promise<void> {
/**
* Main function to run the streaming client
*/
// Example configuration - modify these values
const config = {
conversation_id: "4b03707134ee42b4abf613353f746b6c",
api_key: process.env.API_KEY || "",
system_prompt:
"You are a helpful AI assistant specialized in software development. You are also a joke teller.",
user_prompt: "Tell me a joke.",
research_mode: "deep_research",
};
// FastAPI server URL
console.log("🚀 Starting FastAPI Streaming Client");
const client = new StreamingClient(
process.env.API_BASE_URL || "http://localhost:3000"
);
await client.stream_conversation(
config.conversation_id,
config.api_key,
config.system_prompt,
config.user_prompt,
config.research_mode
);
}
function print_usage(): void {
/** Print usage information */
console.log(`
FastAPI Streaming Client
This client connects to your FastAPI streaming endpoint and displays
real-time responses from the socket.io conversation.
Before running:
1. Start your socket.io server (usually on port 3000)
2. Start your FastAPI server with the streaming endpoint (usually on port 8000)
3. Update the configuration in this script with your actual values
Usage:
node fastapi_streaming_client.js
`);
}
if (
process.argv.length > 2 &&
["-h", "--help", "help"].includes(process.argv[2])
) {
print_usage();
} else {
main().catch((e) => {
console.log(`\n💥 Client error: ${e.message}`);
});
}
Environment Variables
| Variable | Required | Description | Default |
|---|---|---|---|
API_KEY | ✅ | Your Thesis.io API key | None |
API_BASE_URL | ❌ | Base URL for the API | https://app-be.thesis.io |
Research Modes
| Mode | Description |
|---|---|
deep_research | Comprehensive analysis with multiple sources |
chat | Faster responses with fewer sources |
follow_up | Follow-up questions and responses |
rerun_section | Rerun a specific section of your Thesis.io Space |
Configuration Parameters
Copy
config = {
'conversation_id': 'your_conversation_id_here',
'api_key': os.getenv('API_KEY'),
'system_prompt': 'Your custom system prompt for DeFi research',
'user_prompt': 'Your follow-up question or research request',
'research_mode': 'deep_research',
}
Running the Code
- Python Setup
- TypeScript Setup
-
Save the code to a file named
join_conversation.py -
Set up your environment:
Copy
# Create .env file echo "API_KEY=your_actual_api_key_here" > .env echo "API_BASE_URL=https://app-be.thesis.io" >> .env -
Update the conversation ID:
Replace
'4b03707134ee42b4abf613353f746b6c'in the code with your actual conversation ID from a previous Thesis.io session. -
Run the client:
Copy
python join_conversation.py
-
Save the code to a file named
join-conversation.ts -
Package.json file:
Copy
{ "name": "join-conversation-demo", "version": "1.0.0", "description": "Simple TypeScript client for Thesis Space API", "main": "dist/index.js", "scripts": { "build": "tsc", "start": "node dist/index.js", "dev": "vite-node src/index.ts" }, "dependencies": { "dotenv": "^16.4.5", }, "devDependencies": { "@types/node": "^20.0.0", "typescript": "^5.0.0", "vite-node": "^3.2.4" } } -
Set up your project:
Copy
# Install only TypeScript dependencies (no HTTP client needed) npm install -
Create TypeScript config:
Copy
{ "compilerOptions": { "target": "ES2020", "module": "commonjs", "outDir": "./dist", "rootDir": "./src", "strict": true, "noImplicitAny": false, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, "resolveJsonModule": true, }, "include": ["src/**/*"], "exclude": ["node_modules", "dist"] }
This TypeScript client uses Node.js built-in
fetch (Node.js 18+) and URL, so no additional HTTP dependencies are required.-
Set up your environment:
Copy
# Create .env file echo "API_KEY=your_actual_api_key_here" > .env echo "API_BASE_URL=http://localhost:3000" >> .env -
Update the conversation ID:
Replace
'4b03707134ee42b4abf613353f746b6c'in the code with your actual conversation ID from a previous session. -
Ensure Node.js 18+:
Copy
# Check Node.js version (must be 18+ for native fetch support) node --version -
Run the client:
Copy
# Run directly with ts-node npm start # Or compile and run npm run build npm run run:js
Understanding the Output
The streaming client will display different types of events:- 🔗 Connection events: When the client connects/disconnects
- 📨 Socket events: Main conversation data and responses
- 🏁 Completion: When the conversation finishes or requires user input
- ❌ Errors: Any issues during the streaming process
Both Python and TypeScript clients automatically handle JSON chunking and provide detailed logging. The TypeScript version uses native Node.js APIs for a lightweight, dependency-free approach.
TypeScript Implementation Benefits
The TypeScript implementation provides several advantages:Native Node.js APIs
- Built-in fetch: Uses Node.js 18+ native
fetchAPI - No dependencies: No external HTTP libraries required
- URL class: Built-in URL construction and validation
- TextDecoder: Native streaming text decoding
Modern JavaScript Features
- ReadableStream: Native stream processing for optimal performance
- Promise-based: Clean async/await patterns
- Error handling: Comprehensive FetchError and AbortError handling
Lightweight Design
Copy
// Simple configuration
const config = {
conversation_id: "your-conversation-id",
api_key: process.env.API_KEY || "",
system_prompt: "Your system prompt",
user_prompt: "Your question",
research_mode: "deep_research",
};
// Direct method call
await client.stream_conversation(
config.conversation_id,
config.api_key,
config.system_prompt,
config.user_prompt,
config.research_mode
);
Common Use Cases
- Python Examples
- TypeScript Examples
DeFi Protocol Research
Copy
config = {
'conversation_id': 'your_conversation_id',
'system_prompt': 'You are a DeFi protocol analyst focusing on yield optimization strategies.',
'user_prompt': 'Analyze the recent changes in Compound v3 and their impact on lending rates.',
'research_mode': 'deep_research',
}
Market Analysis Follow-up
Copy
config = {
'conversation_id': 'your_conversation_id',
'system_prompt': 'You are a crypto market analyst specializing in DeFi trends.',
'user_prompt': 'Based on our previous discussion, what are the risks of the new staking derivatives?',
'research_mode': 'chat',
}
DeFi Protocol Research
Copy
const config: ConversationConfig = {
conversationId: 'your_conversation_id',
systemPrompt: 'You are a DeFi protocol analyst focusing on yield optimization strategies.',
userPrompt: 'Analyze the recent changes in Compound v3 and their impact on lending rates.',
researchMode: 'deep_research',
apiKey: process.env.API_KEY!
};
Market Analysis Follow-up
Copy
const config: ConversationConfig = {
conversationId: 'your_conversation_id',
systemPrompt: 'You are a crypto market analyst specializing in DeFi trends.',
userPrompt: 'Based on our previous discussion, what are the risks of the new staking derivatives?',
researchMode: 'chat',
apiKey: process.env.API_KEY!
};
Yield Farming Strategy Analysis
Copy
const yieldAnalysisConfig: ConversationConfig = {
conversationId: 'your_conversation_id',
systemPrompt: 'You are a yield farming specialist with expertise in cross-chain protocols.',
userPrompt: 'Compare the current APY opportunities across Ethereum, Arbitrum, and Polygon for stablecoin farming.',
researchMode: 'deep_research',
apiKey: process.env.API_KEY!
};
const client = new StreamingClient();
await client.streamConversation(yieldAnalysisConfig);
Risk Assessment Follow-up
Copy
const riskConfig: ConversationConfig = {
conversationId: 'your_conversation_id',
systemPrompt: 'You are a DeFi risk analyst focused on smart contract security and liquidity risks.',
userPrompt: 'Given the previous protocol analysis, what are the key risk indicators I should monitor?',
researchMode: 'follow_up',
apiKey: process.env.API_KEY!
};
Error Handling
Both implementations include comprehensive error handling:Common Error Types
- Connection failures: Network issues or invalid endpoints
- Authentication errors: Invalid API keys (401 status codes)
- Timeout errors: Long-running requests that exceed the 5-minute limit
- JSON parsing errors: Malformed responses from the server
- Stream interruptions: Graceful handling of interrupted connections
TypeScript-Specific Error Handling
The TypeScript client provides enhanced error detection:Copy
try {
await client.streamConversation(config);
} catch (error: any) {
if (error.code === 'ECONNREFUSED') {
console.log('❌ Connection refused - check if the API endpoint is running');
} else if (error.code === 'ENOTFOUND') {
console.log('❌ DNS resolution failed - check your internet connection');
} else if (error.code === 'ETIMEDOUT') {
console.log('⏰ Request timed out - the server may be overloaded');
} else if (error.response?.status === 401) {
console.log('🔐 Authentication failed - check your API key');
} else if (error.response?.status === 404) {
console.log('❓ Conversation not found - verify your conversation ID');
} else {
console.log(`❌ Unexpected error: ${error.message}`);
}
}
Signal Handling
The TypeScript implementation includes graceful shutdown on process signals:Copy
process.on('SIGINT', () => {
console.log('\n⏹️ Client stopped by user');
process.exit(0);
});
process.on('SIGTERM', () => {
console.log('\n⏹️ Client terminated');
process.exit(0);
});
Next Steps
- Explore API Reference for more endpoint options
- Learn about Research Modes for different analysis types
- Check out Live Demos for interactive examples
- Read about DeFi Data Sources available through Thesis.io
Remember to keep your API key secure and never commit it to version control. Always use environment variables for sensitive configuration.