Skip to main content
Learn how to listen for and process real-time conversation events using the thesis-py SDK.

Setup

Install the required dependencies:
pip install thesis_py python-dotenv
Create a .env file:
THESIS_API_KEY=your_api_key_here
THESIS_BASE_URL=https://app-be.thesis.io

Create and setup your API key

Get your Thesis.io API key


Initialize the client:
import os
import asyncio
from dotenv import load_dotenv
from thesis_py import Thesis
from thesis_py.api_schema import JoinConversationIntegrationRequest, ResearchMode

load_dotenv()

thesis = Thesis(
    api_key=os.getenv("THESIS_API_KEY"),
    base_url=os.getenv("THESIS_BASE_URL", "https://app-be.thesis.io")
)

Basic Event Listener

async def listen_to_conversation(conversation_id: str, user_prompt: str):
    """Listen to conversation events"""
    
    request = JoinConversationIntegrationRequest(
        conversation_id=conversation_id,
        user_prompt=user_prompt,
        research_mode=ResearchMode.CHAT
    )
    
    try:
        async for event in thesis.join_conversation(request):
            print(f"Event: {type(event).__name__}")
            print(f"Data: {event}")
            print("---")
            
    except Exception as e:
        print(f"Error: {e}")

# Usage
async def main():
    await listen_to_conversation(
        conversation_id="conv_abc123def456",
        user_prompt="What are the latest DeFi yield opportunities?"
    )

if __name__ == "__main__":
    asyncio.run(main())

Processing Event Data

Use the SDK utilities to extract structured data from events:
from thesis_py.research.events.utils import get_pairs_from_events

async def process_events(conversation_id: str, user_prompt: str):
    """Process events and extract conversation pairs"""
    
    request = JoinConversationIntegrationRequest(
        conversation_id=conversation_id,
        user_prompt=user_prompt,
        research_mode=ResearchMode.DEEP_RESEARCH
    )
    
    try:
        async for event in thesis.join_conversation(request):
            # Extract conversation pairs
            pairs = get_pairs_from_events([event])
            
            if pairs:
                for pair in pairs:
                    print(f"Question: {pair.get('user_message', 'N/A')}")
                    print(f"Answer: {pair.get('ai_response', 'N/A')[:200]}...")
                    
                    citations = pair.get('citations', [])
                    if citations:
                        print(f"Sources: {len(citations)} citations")
                    print("=" * 50)
            
    except Exception as e:
        print(f"Error: {e}")

Error Handling

Add robust error handling with retries:
import logging

async def handle_with_retry(conversation_id: str, user_prompt: str, max_retries: int = 3):
    """Handle conversation with retry logic"""
    
    for attempt in range(max_retries):
        try:
            request = JoinConversationIntegrationRequest(
                conversation_id=conversation_id,
                user_prompt=user_prompt,
                research_mode=ResearchMode.DEEP_RESEARCH
            )
            
            event_count = 0
            async for event in thesis.join_conversation(request):
                event_count += 1
                
                # Process event
                pairs = get_pairs_from_events([event])
                if pairs:
                    print(f"Processed event {event_count}")
            
            print(f"Successfully processed {event_count} events")
            return
            
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

Complete Example

import asyncio
import os
from dotenv import load_dotenv
from thesis_py import Thesis
from thesis_py.api_schema import JoinConversationIntegrationRequest, ResearchMode
from thesis_py.research.events.utils import get_pairs_from_events

load_dotenv()

thesis = Thesis(
    api_key=os.getenv("THESIS_API_KEY"),
    base_url=os.getenv("THESIS_BASE_URL", "https://app-be.thesis.io")
)

async def monitor_conversation(conversation_id: str, user_prompt: str):
    """Monitor conversation and extract insights"""
    
    request = JoinConversationIntegrationRequest(
        conversation_id=conversation_id,
        user_prompt=user_prompt,
        research_mode=ResearchMode.DEEP_RESEARCH
    )
    
    event_count = 0
    insights = []
    
    try:
        async for event in thesis.join_conversation(request):
            event_count += 1
            
            # Extract conversation pairs
            pairs = get_pairs_from_events([event])
            
            for pair in pairs:
                ai_response = pair.get('ai_response', '')
                
                # Simple insight extraction
                if 'yield' in ai_response.lower():
                    insights.append('yield_opportunity')
                if 'risk' in ai_response.lower():
                    insights.append('risk_analysis')
                
                print(f"Event {event_count}: {len(ai_response)} chars")
    
    except Exception as e:
        print(f"Error: {e}")
    
    print(f"Processed {event_count} events")
    print(f"Insights: {insights}")

# Run the monitor
async def main():
    await monitor_conversation(
        conversation_id="conv_abc123def456",
        user_prompt="Analyze DeFi protocols for yield opportunities"
    )

if __name__ == "__main__":
    asyncio.run(main())