Skip to main content
This tutorial will guide you through building streaming clients in Python and TypeScript that connect to existing Thesis.io conversations, allowing you to continue DeFi research and ask follow-up questions in real-time.
This tutorial assumes you have an existing conversation ID from a previous Thesis.io session. If you need to create a new conversation first, check out our Quickstart guide.

Prerequisites

Before starting, make sure you have:
  • For Python: Python 3.7+ installed with async programming knowledge
  • For TypeScript: Node.js 16+ and TypeScript experience
  • A valid Thesis.io Space API key
  • An existing conversation ID from a previous session
  • Understanding of streaming and event-driven programming

Get your Thesis.io API key

Required Dependencies

Install the required Python packages:
pip install httpx asyncio python-dotenv

Environment Setup

Create a .env file in your project root:
API_KEY=your_thesis_api_key_here
API_BASE_URL=http://localhost:3000
The TypeScript client defaults to http://localhost:3000 for local development. You can override this by setting the API_BASE_URL environment variable or modifying the base URL in your code.

Building the Streaming Client

Let’s break down the streaming client into digestible components for both Python and TypeScript:

1. Core Client Class

First, we’ll create the main streaming client class that handles HTTP connections:
import asyncio
import json
import os
import httpx

class StreamingClient:
    def __init__(self, base_url: str = 'https://app-be.thesis.io'):
        self.base_url = base_url
        self.client = None

    async def __aenter__(self):
        self.client = httpx.AsyncClient(timeout=300.0)  # 5 minute timeout
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.aclose()
The Python version uses async context managers for resource management, while the TypeScript version relies on Node.js’s built-in memory management for HTTP connections.

2. Event Handler System

The streaming API returns various types of events. Here’s how to handle them:
async def _handle_event(self, event: dict):
    """Handle different types of events from the stream"""
    event_type = event.get('type', 'unknown')

    if event_type == 'connection':
        status = event.get('status', '')
        message = event.get('message', '')
        if status == 'connected':
            print(f'🔗 {message}')
        elif status == 'disconnected':
            print(f'🔌 {message}')

    elif event_type == 'oh_event':
        # Main socket.io event data
        data = event.get('data', {})
        print('\n📨 Socket Event:')
        print(f"   Type: {data.get('type', 'N/A')}")
        print(f"   Source: {data.get('source', 'N/A')}")
        
        # Display content based on what's available
        if 'content' in data:
            print(f"   Content: {data['content']}")
        if 'message' in data:
            print(f"   Message: {data['message']}")
        if 'observation' in data:
            print(f"   Observation: {data['observation']}")
            
        # Check for agent state changes
        if 'extras' in data and data['extras']:
            extras = data['extras']
            if 'agent_state' in extras:
                print(f"   Agent State: {extras['agent_state']}")
                if extras['agent_state'] == 'awaiting_user_input':
                    print('   🏁 Agent is now awaiting user input - conversation completed!')
        
        print(f'   Full Data: {json.dumps(data, indent=2)}')
        print('-' * 30)

    elif event_type == 'error':
        error_type = event.get('error', 'Unknown')
        message = event.get('message', '')
        print(f'\n❌ Error ({error_type}): {message}')

    elif event_type == 'completion':
        reason = event.get('reason', 'unknown')
        status = event.get('status', 'finished')
        print(f'\n🏁 Completion: {status} (reason: {reason})')

    else:
        print(f'\n❓ Unknown event type: {event_type}')
        print(f'   Full Event: {json.dumps(event, indent=2)}')

3. Streaming Connection Logic

The core streaming functionality handles chunked JSON responses. The TypeScript implementation uses Node.js’s built-in fetch API:
async def stream_conversation(
    self,
    conversation_id: str,
    api_key: str,
    system_prompt: str = '',
    user_prompt: str = '',
    research_mode: str = 'deep_research',
):
    """Stream conversation responses from the API endpoint"""
    params = {
        'conversation_id': conversation_id,
        'system_prompt': system_prompt,
        'user_prompt': user_prompt,
        'research_mode': research_mode,
    }

    endpoint = f'{self.base_url}/api/v1/integration/conversations/join-conversation'

    print(f'🔗 Connecting to: {endpoint}')
    print(f'📋 Parameters: {params}')
    print('=' * 50)

    try:
        async with self.client.stream(
            'POST',
            endpoint,
            json=params,
            headers={'Authorization': f'Bearer {api_key}'},
        ) as response:
            print(f'✅ Response Status: {response.status_code}')

            if response.status_code != 200:
                error_text = await response.aread()
                print(f'❌ Error: {error_text.decode()}')
                return

            print('🔄 Streaming events:')
            print('-' * 50)

            # Buffer to handle chunked JSON
            buffer = ''

            async for chunk in response.aiter_text():
                buffer += chunk

                # Process complete JSON objects from buffer
                while buffer:
                    try:
                        decoder = json.JSONDecoder()
                        event, idx = decoder.raw_decode(buffer)

                        await self._handle_event(event)

                        # Remove processed JSON from buffer
                        buffer = buffer[idx:].lstrip()

                        # Check for completion or error
                        if event.get('type') in ['completion', 'error']:
                            return

                    except json.JSONDecodeError:
                        # Incomplete JSON, wait for more data
                        break

            # Handle any remaining buffer content
            if buffer.strip():
                print(f'⚠️  Unparsed buffer content: {buffer}')

    except httpx.ConnectError:
        print(f'❌ Failed to connect to {self.base_url}')
        print('Make sure your API endpoint is accessible!')
    except httpx.TimeoutException:
        print('⏰ Request timed out')
    except Exception as e:
        print(f'❌ Unexpected error: {e}')
The JSON buffer handling is crucial because streaming responses can arrive in chunks, potentially splitting JSON objects across multiple chunks. The TypeScript version assumes one JSON object per accumulated buffer for simplicity.

4. Main Application Logic

Here’s how to tie everything together:
async def main():
    """Main function to run the streaming client"""
    # Load environment variables
    from dotenv import load_dotenv
    load_dotenv()
    
    # Configuration - modify these values for your use case
    config = {
        'conversation_id': '4b03707134ee42b4abf613353f746b6c',  # Replace with your conversation ID
        'api_key': os.getenv('API_KEY'),
        'system_prompt': 'You are a helpful AI assistant specialized in DeFi research and analysis.',
        'user_prompt': 'What are the latest developments in yield farming protocols?',
        'research_mode': 'deep_research',
    }

    if not config['api_key']:
        print('❌ Error: API_KEY not found in environment variables')
        return

    print('🚀 Starting Thesis.io Streaming Client')

    async with StreamingClient(os.getenv('API_BASE_URL', 'https://app-be.thesis.io')) as client:
        await client.stream_conversation(**config)

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('\n⏹️  Client stopped by user')
    except Exception as e:
        print(f'\n💥 Client error: {e}')

Complete Implementation

Here’s the complete implementation you can copy and use:
#!/usr/bin/env python3
"""
Thesis.io Streaming Client - Join existing conversations for DeFi research
"""

import asyncio
import json
import os
import sys
import httpx
from dotenv import load_dotenv



class StreamingClient:
    def __init__(self, base_url: str = 'https://app-be.thesis.io'):
        self.base_url = base_url
        self.client = None

    async def __aenter__(self):
        self.client = httpx.AsyncClient(timeout=300.0)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.aclose()

    async def _handle_event(self, event: dict):
        """Handle different types of events from the stream"""
        event_type = event.get('type', 'unknown')

        if event_type == 'connection':
            status = event.get('status', '')
            message = event.get('message', '')
            if status == 'connected':
                print(f'🔗 {message}')
            elif status == 'disconnected':
                print(f'🔌 {message}')

        elif event_type == 'oh_event':
            data = event.get('data', {})
            print('\n📨 Socket Event:')
            print(f"   Type: {data.get('type', 'N/A')}")
            print(f"   Source: {data.get('source', 'N/A')}")
            if 'content' in data:
                print(f"   Content: {data['content']}")
            if 'message' in data:
                print(f"   Message: {data['message']}")
            if 'observation' in data:
                print(f"   Observation: {data['observation']}")
            if 'extras' in data and data['extras']:
                extras = data['extras']
                if 'agent_state' in extras:
                    print(f"   Agent State: {extras['agent_state']}")
                    if extras['agent_state'] == 'awaiting_user_input':
                        print('   🏁 Agent is now awaiting user input - conversation completed!')
            print(f'   Full Data: {json.dumps(data, indent=2)}')
            print('-' * 30)

        elif event_type == 'error':
            error_type = event.get('error', 'Unknown')
            message = event.get('message', '')
            print(f'\n❌ Error ({error_type}): {message}')

        elif event_type == 'completion':
            reason = event.get('reason', 'unknown')
            status = event.get('status', 'finished')
            print(f'\n🏁 Completion: {status} (reason: {reason})')

        else:
            print(f'\n❓ Unknown event type: {event_type}')
            print(f'   Full Event: {json.dumps(event, indent=2)}')

    async def stream_conversation(
        self,
        conversation_id: str,
        api_key: str,
        system_prompt: str = '',
        user_prompt: str = '',
        research_mode: str = 'deep_research',
    ):
        """Stream conversation responses from the API endpoint"""
        params = {
            'conversation_id': conversation_id,
            'system_prompt': system_prompt,
            'user_prompt': user_prompt,
            'research_mode': research_mode,
        }

        endpoint = f'{self.base_url}/api/v1/integration/conversations/join-conversation'

        print(f'🔗 Connecting to: {endpoint}')
        print(f'📋 Parameters: {params}')
        print('=' * 50)

        try:
            async with self.client.stream(
                'POST',
                endpoint,
                json=params,
                headers={'Authorization': f'Bearer {api_key}'},
            ) as response:
                print(f'✅ Response Status: {response.status_code}')

                if response.status_code != 200:
                    error_text = await response.aread()
                    print(f'❌ Error: {error_text.decode()}')
                    return

                print('🔄 Streaming events:')
                print('-' * 50)

                buffer = ''

                async for chunk in response.aiter_text():
                    buffer += chunk

                    while buffer:
                        try:
                            decoder = json.JSONDecoder()
                            event, idx = decoder.raw_decode(buffer)

                            await self._handle_event(event)
                            buffer = buffer[idx:].lstrip()

                            if event.get('type') == 'completion':
                                status = event.get('status', 'finished')
                                if status == 'cancelled':
                                    print(f"\n🚫 Stream cancelled: {event.get('message', 'Stream was cancelled')}")
                                elif status == 'finished':
                                    print(f"\n✅ Stream completed successfully with message: {event.get('message', 'Unknown message')}")
                                else:
                                    print(f"\n🏁 Stream ended with status '{status}': {event.get('message', 'No message')}")
                                return
                            elif event.get('type') == 'error':
                                print(f"\n❌ Stream ended with error: {event.get('message', 'Unknown error')}")
                                return

                        except json.JSONDecodeError:
                            break

                if buffer.strip():
                    print(f'⚠️  Unparsed buffer content: {buffer}')

        except httpx.ConnectError:
            print(f'❌ Failed to connect to {self.base_url}')
            print('Make sure your API endpoint is accessible!')
        except httpx.TimeoutException:
            print('⏰ Request timed out')
        except Exception as e:
            print(f'❌ Unexpected error: {e}')

async def main():
    """Main function to run the streaming client"""
    load_dotenv()
    
    # Example configuration - modify these values
    config = {
        'conversation_id': '4b03707134ee42b4abf613353f746b6c',  # Replace with your conversation ID
        'api_key': os.getenv('API_KEY'),
        'system_prompt': 'You are a helpful AI assistant specialized in DeFi research and analysis.',
        'user_prompt': 'What are the latest developments in yield farming protocols?',
        'research_mode': 'deep_research',
    }

    if not config['api_key']:
        print('❌ Error: API_KEY not found in environment variables')
        return

    print('🚀 Starting Thesis.io Streaming Client')

    async with StreamingClient(os.getenv('API_BASE_URL', 'https://app-be.thesis.io')) as client:
        await client.stream_conversation(**config)

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] in ['-h', '--help', 'help']:
        print("""
Thesis.io Streaming Client

This client connects to existing Thesis.io conversations for continued DeFi research.

Before running:
1. Set your API_KEY in the .env file
2. Set your API_BASE_URL (optional, defaults to https://app-be.thesis.io)
3. Update the conversation_id in the script with your actual conversation ID

Usage:
    python join_conversation.py
        """)
    else:
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('\n⏹️  Client stopped by user')
        except Exception as e:
            print(f'\n💥 Client error: {e}')

Running the Code

  1. Save the code to a file named join_conversation.py
  2. Set up your environment:
    # Create .env file
    echo "API_KEY=your_actual_api_key_here" > .env
    echo "API_BASE_URL=https://app-be.thesis.io" >> .env
    
  3. Update the conversation ID: Replace '4b03707134ee42b4abf613353f746b6c' in the code with your actual conversation ID from a previous Thesis.io session.
  4. Run the client:
    python join_conversation.py
    

Understanding the Output

The streaming client will display different types of events:
  • 🔗 Connection events: When the client connects/disconnects
  • 📨 Socket events: Main conversation data and responses
  • 🏁 Completion: When the conversation finishes or requires user input
  • ❌ Errors: Any issues during the streaming process
Both Python and TypeScript clients automatically handle JSON chunking and provide detailed logging. The TypeScript version uses native Node.js APIs for a lightweight, dependency-free approach.

TypeScript Implementation Benefits

The TypeScript implementation provides several advantages:

Native Node.js APIs

  • Built-in fetch: Uses Node.js 18+ native fetch API
  • No dependencies: No external HTTP libraries required
  • URL class: Built-in URL construction and validation
  • TextDecoder: Native streaming text decoding

Modern JavaScript Features

  • ReadableStream: Native stream processing for optimal performance
  • Promise-based: Clean async/await patterns
  • Error handling: Comprehensive FetchError and AbortError handling

Lightweight Design

// Simple configuration
const config = {
  conversation_id: "your-conversation-id",
  api_key: process.env.API_KEY || "",
  system_prompt: "Your system prompt",
  user_prompt: "Your question",
  research_mode: "deep_research",
};

// Direct method call
await client.stream_conversation(
  config.conversation_id,
  config.api_key,
  config.system_prompt,
  config.user_prompt,
  config.research_mode
);

Common Use Cases

DeFi Protocol Research

config = {
    'conversation_id': 'your_conversation_id',
    'system_prompt': 'You are a DeFi protocol analyst focusing on yield optimization strategies.',
    'user_prompt': 'Analyze the recent changes in Compound v3 and their impact on lending rates.',
    'research_mode': 'deep_research',
}

Market Analysis Follow-up

config = {
    'conversation_id': 'your_conversation_id',
    'system_prompt': 'You are a crypto market analyst specializing in DeFi trends.',
    'user_prompt': 'Based on our previous discussion, what are the risks of the new staking derivatives?',
    'research_mode': 'chat',
}

Error Handling

Both implementations include comprehensive error handling:

Common Error Types

  • Connection failures: Network issues or invalid endpoints
  • Authentication errors: Invalid API keys (401 status codes)
  • Timeout errors: Long-running requests that exceed the 5-minute limit
  • JSON parsing errors: Malformed responses from the server
  • Stream interruptions: Graceful handling of interrupted connections

TypeScript-Specific Error Handling

The TypeScript client provides enhanced error detection:
try {
  await client.streamConversation(config);
} catch (error: any) {
  if (error.code === 'ECONNREFUSED') {
    console.log('❌ Connection refused - check if the API endpoint is running');
  } else if (error.code === 'ENOTFOUND') {
    console.log('❌ DNS resolution failed - check your internet connection');
  } else if (error.code === 'ETIMEDOUT') {
    console.log('⏰ Request timed out - the server may be overloaded');
  } else if (error.response?.status === 401) {
    console.log('🔐 Authentication failed - check your API key');
  } else if (error.response?.status === 404) {
    console.log('❓ Conversation not found - verify your conversation ID');
  } else {
    console.log(`❌ Unexpected error: ${error.message}`);
  }
}

Signal Handling

The TypeScript implementation includes graceful shutdown on process signals:
process.on('SIGINT', () => {
  console.log('\n⏹️  Client stopped by user');
  process.exit(0);
});

process.on('SIGTERM', () => {
  console.log('\n⏹️  Client terminated');
  process.exit(0);
});

Next Steps

Remember to keep your API key secure and never commit it to version control. Always use environment variables for sensitive configuration.