Python Examples

Python

Complete Python examples for integrating with the Magnetite API using requests and asyncio

Installation

Install the required dependencies:

pip install requests python-dotenv asyncio aiohttp

Basic Client Class

Create a reusable client class for interacting with the Magnetite API:

# magnetite_client.py
import os
import requests
import time
from typing import Dict, Any, Optional
from dotenv import load_dotenv

load_dotenv()

class MagnetiteClient:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv('MAGNETITE_API_KEY')
        self.base_url = 'https://magnetite.ai/api'
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        })
        
        if not self.api_key:
            raise ValueError("API key is required. Set MAGNETITE_API_KEY environment variable or pass it directly.")
    
    def generate_lead_magnet(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a new lead magnet for a prospect."""
        try:
            response = self.session.post(f'{self.base_url}/projects/{project_id}/generate', json=data)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            self._handle_error(e)
    
    def check_status(self, job_id: str) -> Dict[str, Any]:
        """Check the status of a generation job."""
        try:
            response = self.session.get(f'{self.base_url}/generation/{job_id}/status')
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            self._handle_error(e)
    
    def check_lead_status(self, lead_id: str) -> Dict[str, Any]:
        """Check status using lead ID instead of job ID."""
        try:
            response = self.session.get(f'{self.base_url}/leads/{lead_id}/job-status')
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            self._handle_error(e)
    
    def _handle_error(self, error):
        """Handle and format API errors."""
        if hasattr(error, 'response') and error.response is not None:
            try:
                error_data = error.response.json()
                error_msg = error_data.get('error', {}).get('message', 'Unknown API error')
                raise Exception(f"API Error ({error.response.status_code}): {error_msg}")
            except ValueError:
                raise Exception(f"HTTP Error ({error.response.status_code}): {error.response.text}")
        else:
            raise Exception(f"Request Error: {str(error)}")
    
    def poll_until_complete(self, job_id: str, max_attempts: int = 30, initial_delay: float = 2.0) -> Dict[str, Any]:
        """Poll the API until generation is complete."""
        for attempt in range(1, max_attempts + 1):
            try:
                status = self.check_status(job_id)
                
                print(f"Attempt {attempt}: {status.get('status', 'unknown')} ({status.get('progress', 0)}%)")
                
                if status.get('status') == 'completed':
                    return status.get('result', status)
                
                if status.get('status') == 'failed':
                    error_msg = status.get('error', {}).get('message', 'Generation failed')
                    raise Exception(f"Generation failed: {error_msg}")
                
                # Exponential backoff with jitter
                delay = min(initial_delay * (1.5 ** (attempt - 1)), 30.0)
                print(f"Waiting {delay:.1f}s before next check...")
                time.sleep(delay)
                
            except Exception as e:
                print(f"Polling error on attempt {attempt}: {str(e)}")
                if attempt == max_attempts:
                    raise
                time.sleep(5)
        
        raise Exception("Generation did not complete within expected time")

Environment Configuration

Create a .env file for your credentials:

# .env
MAGNETITE_API_KEY=sk_your_api_key_here
MAGNETITE_PROJECT_ID=your_project_id_here

Complete Examples

Basic Lead Magnet Generation
Simple example of generating a lead magnet
# basic_example.py
import os
from magnetite_client import MagnetiteClient

def basic_example():
    client = MagnetiteClient()
    
    try:
        result = client.generate_lead_magnet(
            os.getenv('MAGNETITE_PROJECT_ID'), 
            {
                'email': 'john.smith@example.com',
                'fullName': 'John Smith',
                'company': 'Acme Corp',
                'domain': 'acmecorp.com',
                'title': 'VP of Sales',
                'industry': 'SaaS'
            }
        )
        
        print("Generation started:")
        print(f"Lead ID: {result['leadId']}")
        print(f"Job ID: {result['jobId']}")
        print(f"Lead Magnet URL: {result['leadMagnetUrl']}")
        
        return result
        
    except Exception as e:
        print(f"Error generating lead magnet: {e}")
        raise

if __name__ == "__main__":
    basic_example()
    print("Done!")
Generate and Wait for Completion
Poll the API until generation is complete
# wait_for_completion.py
import os
from magnetite_client import MagnetiteClient

def generate_and_wait():
    client = MagnetiteClient()
    
    try:
        # Start generation
        generation = client.generate_lead_magnet(
            os.getenv('MAGNETITE_PROJECT_ID'),
            {
                'email': 'jane.doe@techcorp.com',
                'fullName': 'Jane Doe',
                'company': 'TechCorp Inc',
                'domain': 'techcorp.com',
                'title': 'Marketing Director',
                'industry': 'Technology',
                'companySize': '51-200'
            }
        )
        
        print(f"Generation started with job ID: {generation['jobId']}")
        print(f"Lead magnet will be available at: {generation['leadMagnetUrl']}")
        
        # Wait for completion
        result = client.poll_until_complete(generation['jobId'])
        
        print("Generation completed!")
        print(f"Final URL: {result.get('leadMagnetUrl', 'N/A')}")
        print(f"Processing time: {result.get('processingTime', 'N/A')}s")
        
        return result
        
    except Exception as e:
        print(f"Error: {e}")
        raise

if __name__ == "__main__":
    generate_and_wait()
    print("All done!")
Bulk Processing with Threading
Process multiple leads concurrently with rate limiting
# bulk_processing.py
import os
import time
import concurrent.futures
from typing import List, Dict, Any
from magnetite_client import MagnetiteClient

def process_single_lead(client: MagnetiteClient, lead: Dict[str, Any]) -> Dict[str, Any]:
    """Process a single lead with error handling."""
    try:
        result = client.generate_lead_magnet(
            os.getenv('MAGNETITE_PROJECT_ID'),
            lead
        )
        
        print(f"Started generation for {lead['email']} (Job: {result['jobId']})")
        
        return {
            'lead': lead,
            'success': True,
            'result': result
        }
        
    except Exception as e:
        print(f"Failed to start generation for {lead['email']}: {e}")
        return {
            'lead': lead,
            'success': False,
            'error': str(e)
        }

def bulk_processing():
    client = MagnetiteClient()
    
    leads = [
        {
            'email': 'ceo@startup.com',
            'fullName': 'Sarah Johnson',
            'company': 'StartupCo',
            'domain': 'startup.com',
            'title': 'CEO'
        },
        {
            'email': 'director@bigcorp.com',
            'fullName': 'Michael Chen',
            'company': 'BigCorp Industries',
            'domain': 'bigcorp.com',
            'title': 'Sales Director'
        },
        {
            'email': 'manager@midsize.com',
            'fullName': 'Emily Rodriguez',
            'company': 'MidSize Solutions',
            'domain': 'midsize.com',
            'title': 'Marketing Manager'
        }
    ]
    
    max_workers = 3  # Respect rate limits
    results = []
    
    print(f"Processing {len(leads)} leads with max {max_workers} concurrent requests")
    
    # Process leads in batches to respect rate limits
    batch_size = 5
    for i in range(0, len(leads), batch_size):
        batch = leads[i:i + batch_size]
        
        print(f"Processing batch {i // batch_size + 1} ({len(batch)} leads)")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all tasks in the batch
            future_to_lead = {
                executor.submit(process_single_lead, client, lead): lead 
                for lead in batch
            }
            
            # Collect results as they complete
            batch_results = []
            for future in concurrent.futures.as_completed(future_to_lead):
                result = future.result()
                batch_results.append(result)
            
        results.extend(batch_results)
        
        # Wait between batches to respect rate limits
        if i + batch_size < len(leads):
            print("Waiting 10s before next batch...")
            time.sleep(10)
    
    # Summary
    successful = [r for r in results if r['success']]
    failed = [r for r in results if not r['success']]
    
    print("
=== SUMMARY ===")
    print(f"Total processed: {len(results)}")
    print(f"Successful: {len(successful)}")
    print(f"Failed: {len(failed)}")
    
    if failed:
        print("
Failed leads:")
        for result in failed:
            print(f"  - {result['lead']['email']}: {result['error']}")
    
    return results

if __name__ == "__main__":
    bulk_processing()
    print("Bulk processing complete!")
Async Implementation
Asynchronous client using aiohttp for better performance
# async_magnetite_client.py
import os
import asyncio
import aiohttp
from typing import Dict, Any, Optional
from dotenv import load_dotenv

load_dotenv()

class AsyncMagnetiteClient:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv('MAGNETITE_API_KEY')
        self.base_url = 'https://magnetite.ai/api'
        
        if not self.api_key:
            raise ValueError("API key is required")
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers={
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def generate_lead_magnet(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a new lead magnet for a prospect."""
        try:
            async with self.session.post(f'{self.base_url}/projects/{project_id}/generate', json=data) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientError as e:
            await self._handle_error(e, response if 'response' in locals() else None)
    
    async def check_status(self, job_id: str) -> Dict[str, Any]:
        """Check the status of a generation job."""
        try:
            async with self.session.get(f'{self.base_url}/generation/{job_id}/status') as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientError as e:
            await self._handle_error(e, response if 'response' in locals() else None)
    
    async def _handle_error(self, error, response=None):
        """Handle and format API errors."""
        if response is not None:
            try:
                error_data = await response.json()
                error_msg = error_data.get('error', {}).get('message', 'Unknown API error')
                raise Exception(f"API Error ({response.status}): {error_msg}")
            except:
                text = await response.text()
                raise Exception(f"HTTP Error ({response.status}): {text}")
        else:
            raise Exception(f"Request Error: {str(error)}")
    
    async def poll_until_complete(self, job_id: str, max_attempts: int = 30) -> Dict[str, Any]:
        """Poll the API until generation is complete."""
        for attempt in range(1, max_attempts + 1):
            try:
                status = await self.check_status(job_id)
                
                print(f"Attempt {attempt}: {status.get('status', 'unknown')} ({status.get('progress', 0)}%)")
                
                if status.get('status') == 'completed':
                    return status.get('result', status)
                
                if status.get('status') == 'failed':
                    error_msg = status.get('error', {}).get('message', 'Generation failed')
                    raise Exception(f"Generation failed: {error_msg}")
                
                # Exponential backoff
                delay = min(2.0 * (1.5 ** (attempt - 1)), 30.0)
                print(f"Waiting {delay:.1f}s before next check...")
                await asyncio.sleep(delay)
                
            except Exception as e:
                print(f"Polling error on attempt {attempt}: {str(e)}")
                if attempt == max_attempts:
                    raise
                await asyncio.sleep(5)
        
        raise Exception("Generation did not complete within expected time")

# Example usage
async def async_example():
    async with AsyncMagnetiteClient() as client:
        try:
            # Start multiple generations concurrently
            tasks = []
            
            leads = [
                {'email': 'user1@example.com', 'fullName': 'User One', 'company': 'Company A'},
                {'email': 'user2@example.com', 'fullName': 'User Two', 'company': 'Company B'},
            ]
            
            for lead in leads:
                task = client.generate_lead_magnet(
                    os.getenv('MAGNETITE_PROJECT_ID'),
                    lead
                )
                tasks.append(task)
            
            # Wait for all generations to start
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            print("All generations started!")
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    print(f"Lead {i+1} failed: {result}")
                else:
                    print(f"Lead {i+1} job ID: {result['jobId']}")
            
        except Exception as e:
            print(f"Error: {e}")

if __name__ == "__main__":
    asyncio.run(async_example())

Error Handling Example

Comprehensive error handling with retries and logging:

# error_handling_example.py
import logging
import time
from typing import Dict, Any
from magnetite_client import MagnetiteClient

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustMagnetiteClient(MagnetiteClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_retries = 3
        self.retry_delay = 1.0
    
    def generate_with_retry(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Generate lead magnet with automatic retry logic."""
        last_exception = None
        
        for attempt in range(1, self.max_retries + 1):
            try:
                logger.info(f"Generation attempt {attempt}/{self.max_retries}")
                result = self.generate_lead_magnet(project_id, data)
                logger.info(f"Generation successful on attempt {attempt}")
                return result
                
            except Exception as e:
                last_exception = e
                error_msg = str(e)
                
                # Check if error is retryable
                if "rate limit" in error_msg.lower():
                    logger.warning(f"Rate limit hit on attempt {attempt}, waiting longer...")
                    delay = self.retry_delay * (2 ** attempt)  # Exponential backoff
                    time.sleep(delay)
                elif "server error" in error_msg.lower() or "503" in error_msg:
                    logger.warning(f"Server error on attempt {attempt}, retrying...")
                    time.sleep(self.retry_delay * attempt)
                elif "insufficient credits" in error_msg.lower():
                    logger.error("Insufficient credits - cannot retry")
                    raise
                elif "invalid api key" in error_msg.lower():
                    logger.error("Invalid API key - cannot retry")
                    raise
                else:
                    logger.error(f"Non-retryable error on attempt {attempt}: {error_msg}")
                    if attempt == self.max_retries:
                        raise
                    time.sleep(self.retry_delay)
        
        # If we get here, all retries failed
        logger.error(f"All {self.max_retries} attempts failed")
        raise last_exception

def robust_example():
    client = RobustMagnetiteClient()
    
    leads = [
        {
            'email': 'test1@example.com',
            'fullName': 'Test User 1',
            'company': 'Test Company 1',
            'title': 'Manager'
        },
        {
            'email': 'test2@example.com',
            'fullName': 'Test User 2',
            'company': 'Test Company 2',
            'title': 'Director'
        }
    ]
    
    successful = 0
    failed = 0
    
    for lead in leads:
        try:
            logger.info(f"Processing {lead['email']}")
            
            result = client.generate_with_retry(
                os.getenv('MAGNETITE_PROJECT_ID'),
                lead
            )
            
            logger.info(f"Successfully started generation for {lead['email']}")
            logger.info(f"Job ID: {result['jobId']}")
            logger.info(f"Lead Magnet URL: {result['leadMagnetUrl']}")
            
            successful += 1
            
        except Exception as e:
            logger.error(f"Failed to process {lead['email']}: {e}")
            failed += 1
    
    logger.info(f"Processing complete: {successful} successful, {failed} failed")

if __name__ == "__main__":
    robust_example()

Next Steps

Explore more examples in different programming languages or dive deeper into the API.