Python Examples
Python
Complete Python examples for integrating with the Magnetite API using requests and asyncio
Installation
Install the required dependencies:
pip install requests python-dotenv asyncio aiohttp
Basic Client Class
Create a reusable client class for interacting with the Magnetite API:
# magnetite_client.py
import os
import requests
import time
from typing import Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
class MagnetiteClient:
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv('MAGNETITE_API_KEY')
self.base_url = 'https://magnetite.ai/api'
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
})
if not self.api_key:
raise ValueError("API key is required. Set MAGNETITE_API_KEY environment variable or pass it directly.")
def generate_lead_magnet(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a new lead magnet for a prospect."""
try:
response = self.session.post(f'{self.base_url}/projects/{project_id}/generate', json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self._handle_error(e)
def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check the status of a generation job."""
try:
response = self.session.get(f'{self.base_url}/generation/{job_id}/status')
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self._handle_error(e)
def check_lead_status(self, lead_id: str) -> Dict[str, Any]:
"""Check status using lead ID instead of job ID."""
try:
response = self.session.get(f'{self.base_url}/leads/{lead_id}/job-status')
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self._handle_error(e)
def _handle_error(self, error):
"""Handle and format API errors."""
if hasattr(error, 'response') and error.response is not None:
try:
error_data = error.response.json()
error_msg = error_data.get('error', {}).get('message', 'Unknown API error')
raise Exception(f"API Error ({error.response.status_code}): {error_msg}")
except ValueError:
raise Exception(f"HTTP Error ({error.response.status_code}): {error.response.text}")
else:
raise Exception(f"Request Error: {str(error)}")
def poll_until_complete(self, job_id: str, max_attempts: int = 30, initial_delay: float = 2.0) -> Dict[str, Any]:
"""Poll the API until generation is complete."""
for attempt in range(1, max_attempts + 1):
try:
status = self.check_status(job_id)
print(f"Attempt {attempt}: {status.get('status', 'unknown')} ({status.get('progress', 0)}%)")
if status.get('status') == 'completed':
return status.get('result', status)
if status.get('status') == 'failed':
error_msg = status.get('error', {}).get('message', 'Generation failed')
raise Exception(f"Generation failed: {error_msg}")
# Exponential backoff with jitter
delay = min(initial_delay * (1.5 ** (attempt - 1)), 30.0)
print(f"Waiting {delay:.1f}s before next check...")
time.sleep(delay)
except Exception as e:
print(f"Polling error on attempt {attempt}: {str(e)}")
if attempt == max_attempts:
raise
time.sleep(5)
raise Exception("Generation did not complete within expected time")
Environment Configuration
Create a .env
file for your credentials:
# .env
MAGNETITE_API_KEY=sk_your_api_key_here
MAGNETITE_PROJECT_ID=your_project_id_here
Complete Examples
Basic Lead Magnet Generation
Simple example of generating a lead magnet
# basic_example.py
import os
from magnetite_client import MagnetiteClient
def basic_example():
client = MagnetiteClient()
try:
result = client.generate_lead_magnet(
os.getenv('MAGNETITE_PROJECT_ID'),
{
'email': 'john.smith@example.com',
'fullName': 'John Smith',
'company': 'Acme Corp',
'domain': 'acmecorp.com',
'title': 'VP of Sales',
'industry': 'SaaS'
}
)
print("Generation started:")
print(f"Lead ID: {result['leadId']}")
print(f"Job ID: {result['jobId']}")
print(f"Lead Magnet URL: {result['leadMagnetUrl']}")
return result
except Exception as e:
print(f"Error generating lead magnet: {e}")
raise
if __name__ == "__main__":
basic_example()
print("Done!")
Generate and Wait for Completion
Poll the API until generation is complete
# wait_for_completion.py
import os
from magnetite_client import MagnetiteClient
def generate_and_wait():
client = MagnetiteClient()
try:
# Start generation
generation = client.generate_lead_magnet(
os.getenv('MAGNETITE_PROJECT_ID'),
{
'email': 'jane.doe@techcorp.com',
'fullName': 'Jane Doe',
'company': 'TechCorp Inc',
'domain': 'techcorp.com',
'title': 'Marketing Director',
'industry': 'Technology',
'companySize': '51-200'
}
)
print(f"Generation started with job ID: {generation['jobId']}")
print(f"Lead magnet will be available at: {generation['leadMagnetUrl']}")
# Wait for completion
result = client.poll_until_complete(generation['jobId'])
print("Generation completed!")
print(f"Final URL: {result.get('leadMagnetUrl', 'N/A')}")
print(f"Processing time: {result.get('processingTime', 'N/A')}s")
return result
except Exception as e:
print(f"Error: {e}")
raise
if __name__ == "__main__":
generate_and_wait()
print("All done!")
Bulk Processing with Threading
Process multiple leads concurrently with rate limiting
# bulk_processing.py
import os
import time
import concurrent.futures
from typing import List, Dict, Any
from magnetite_client import MagnetiteClient
def process_single_lead(client: MagnetiteClient, lead: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single lead with error handling."""
try:
result = client.generate_lead_magnet(
os.getenv('MAGNETITE_PROJECT_ID'),
lead
)
print(f"Started generation for {lead['email']} (Job: {result['jobId']})")
return {
'lead': lead,
'success': True,
'result': result
}
except Exception as e:
print(f"Failed to start generation for {lead['email']}: {e}")
return {
'lead': lead,
'success': False,
'error': str(e)
}
def bulk_processing():
client = MagnetiteClient()
leads = [
{
'email': 'ceo@startup.com',
'fullName': 'Sarah Johnson',
'company': 'StartupCo',
'domain': 'startup.com',
'title': 'CEO'
},
{
'email': 'director@bigcorp.com',
'fullName': 'Michael Chen',
'company': 'BigCorp Industries',
'domain': 'bigcorp.com',
'title': 'Sales Director'
},
{
'email': 'manager@midsize.com',
'fullName': 'Emily Rodriguez',
'company': 'MidSize Solutions',
'domain': 'midsize.com',
'title': 'Marketing Manager'
}
]
max_workers = 3 # Respect rate limits
results = []
print(f"Processing {len(leads)} leads with max {max_workers} concurrent requests")
# Process leads in batches to respect rate limits
batch_size = 5
for i in range(0, len(leads), batch_size):
batch = leads[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1} ({len(batch)} leads)")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks in the batch
future_to_lead = {
executor.submit(process_single_lead, client, lead): lead
for lead in batch
}
# Collect results as they complete
batch_results = []
for future in concurrent.futures.as_completed(future_to_lead):
result = future.result()
batch_results.append(result)
results.extend(batch_results)
# Wait between batches to respect rate limits
if i + batch_size < len(leads):
print("Waiting 10s before next batch...")
time.sleep(10)
# Summary
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
print("
=== SUMMARY ===")
print(f"Total processed: {len(results)}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
if failed:
print("
Failed leads:")
for result in failed:
print(f" - {result['lead']['email']}: {result['error']}")
return results
if __name__ == "__main__":
bulk_processing()
print("Bulk processing complete!")
Async Implementation
Asynchronous client using aiohttp for better performance
# async_magnetite_client.py
import os
import asyncio
import aiohttp
from typing import Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
class AsyncMagnetiteClient:
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv('MAGNETITE_API_KEY')
self.base_url = 'https://magnetite.ai/api'
if not self.api_key:
raise ValueError("API key is required")
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def generate_lead_magnet(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a new lead magnet for a prospect."""
try:
async with self.session.post(f'{self.base_url}/projects/{project_id}/generate', json=data) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
await self._handle_error(e, response if 'response' in locals() else None)
async def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check the status of a generation job."""
try:
async with self.session.get(f'{self.base_url}/generation/{job_id}/status') as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
await self._handle_error(e, response if 'response' in locals() else None)
async def _handle_error(self, error, response=None):
"""Handle and format API errors."""
if response is not None:
try:
error_data = await response.json()
error_msg = error_data.get('error', {}).get('message', 'Unknown API error')
raise Exception(f"API Error ({response.status}): {error_msg}")
except:
text = await response.text()
raise Exception(f"HTTP Error ({response.status}): {text}")
else:
raise Exception(f"Request Error: {str(error)}")
async def poll_until_complete(self, job_id: str, max_attempts: int = 30) -> Dict[str, Any]:
"""Poll the API until generation is complete."""
for attempt in range(1, max_attempts + 1):
try:
status = await self.check_status(job_id)
print(f"Attempt {attempt}: {status.get('status', 'unknown')} ({status.get('progress', 0)}%)")
if status.get('status') == 'completed':
return status.get('result', status)
if status.get('status') == 'failed':
error_msg = status.get('error', {}).get('message', 'Generation failed')
raise Exception(f"Generation failed: {error_msg}")
# Exponential backoff
delay = min(2.0 * (1.5 ** (attempt - 1)), 30.0)
print(f"Waiting {delay:.1f}s before next check...")
await asyncio.sleep(delay)
except Exception as e:
print(f"Polling error on attempt {attempt}: {str(e)}")
if attempt == max_attempts:
raise
await asyncio.sleep(5)
raise Exception("Generation did not complete within expected time")
# Example usage
async def async_example():
async with AsyncMagnetiteClient() as client:
try:
# Start multiple generations concurrently
tasks = []
leads = [
{'email': 'user1@example.com', 'fullName': 'User One', 'company': 'Company A'},
{'email': 'user2@example.com', 'fullName': 'User Two', 'company': 'Company B'},
]
for lead in leads:
task = client.generate_lead_magnet(
os.getenv('MAGNETITE_PROJECT_ID'),
lead
)
tasks.append(task)
# Wait for all generations to start
results = await asyncio.gather(*tasks, return_exceptions=True)
print("All generations started!")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Lead {i+1} failed: {result}")
else:
print(f"Lead {i+1} job ID: {result['jobId']}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(async_example())
Error Handling Example
Comprehensive error handling with retries and logging:
# error_handling_example.py
import logging
import time
from typing import Dict, Any
from magnetite_client import MagnetiteClient
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustMagnetiteClient(MagnetiteClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_retries = 3
self.retry_delay = 1.0
def generate_with_retry(self, project_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate lead magnet with automatic retry logic."""
last_exception = None
for attempt in range(1, self.max_retries + 1):
try:
logger.info(f"Generation attempt {attempt}/{self.max_retries}")
result = self.generate_lead_magnet(project_id, data)
logger.info(f"Generation successful on attempt {attempt}")
return result
except Exception as e:
last_exception = e
error_msg = str(e)
# Check if error is retryable
if "rate limit" in error_msg.lower():
logger.warning(f"Rate limit hit on attempt {attempt}, waiting longer...")
delay = self.retry_delay * (2 ** attempt) # Exponential backoff
time.sleep(delay)
elif "server error" in error_msg.lower() or "503" in error_msg:
logger.warning(f"Server error on attempt {attempt}, retrying...")
time.sleep(self.retry_delay * attempt)
elif "insufficient credits" in error_msg.lower():
logger.error("Insufficient credits - cannot retry")
raise
elif "invalid api key" in error_msg.lower():
logger.error("Invalid API key - cannot retry")
raise
else:
logger.error(f"Non-retryable error on attempt {attempt}: {error_msg}")
if attempt == self.max_retries:
raise
time.sleep(self.retry_delay)
# If we get here, all retries failed
logger.error(f"All {self.max_retries} attempts failed")
raise last_exception
def robust_example():
client = RobustMagnetiteClient()
leads = [
{
'email': 'test1@example.com',
'fullName': 'Test User 1',
'company': 'Test Company 1',
'title': 'Manager'
},
{
'email': 'test2@example.com',
'fullName': 'Test User 2',
'company': 'Test Company 2',
'title': 'Director'
}
]
successful = 0
failed = 0
for lead in leads:
try:
logger.info(f"Processing {lead['email']}")
result = client.generate_with_retry(
os.getenv('MAGNETITE_PROJECT_ID'),
lead
)
logger.info(f"Successfully started generation for {lead['email']}")
logger.info(f"Job ID: {result['jobId']}")
logger.info(f"Lead Magnet URL: {result['leadMagnetUrl']}")
successful += 1
except Exception as e:
logger.error(f"Failed to process {lead['email']}: {e}")
failed += 1
logger.info(f"Processing complete: {successful} successful, {failed} failed")
if __name__ == "__main__":
robust_example()
Next Steps
Explore more examples in different programming languages or dive deeper into the API.