Production-ready Python SDK with comprehensive API coverage, type hints, and async support.
# Install via pip
pip install sparkmailr
# Or with async support
pip install sparkmailr[async]
import sparkmailr
# Initialize the client
client = sparkmailr.SparkMailrClient(api_key="your-api-key")
# Send a simple email
response = client.emails.send({
"to": ["user@example.com"],
"from": "noreply@yourapp.com",
"subject": "Welcome to SparkMailr!",
"html": "<h1>Hello World</h1>",
"text": "Hello World"
})
print(f"Email sent successfully! ID: {response.id}")
import sparkmailr
# Basic configuration
client = sparkmailr.SparkMailrClient(
api_key="your-api-key",
base_url="https://api.sparkmailr.com", # Optional
timeout=30, # Request timeout in seconds
max_retries=3, # Max retry attempts
retry_delay=1.0 # Delay between retries in seconds
)
# Environment-based configuration
import os
client = sparkmailr.SparkMailrClient(
api_key=os.getenv("SPARKMAILR_API_KEY"),
debug=os.getenv("DEBUG", "false").lower() == "true"
)
# Send email with template
response = client.emails.send({
"to": ["user@example.com"],
"from": "noreply@yourapp.com",
"template_id": "welcome-template",
"template_data": {
"name": "John Doe",
"activation_link": "https://app.example.com/activate/123"
}
})
print(f"Template email sent: {response.id}")
# Send bulk emails
recipients = [
{"to": "user1@example.com", "template_data": {"name": "User 1"}},
{"to": "user2@example.com", "template_data": {"name": "User 2"}},
{"to": "user3@example.com", "template_data": {"name": "User 3"}}
]
response = client.emails.send_bulk({
"from": "noreply@yourapp.com",
"template_id": "newsletter-template",
"recipients": recipients
})
print(f"Bulk email batch: {response.batch_id}")
# Track email status
email_id = "em_1234567890"
status = client.emails.get_status(email_id)
print(f"Email status: {status.status}")
print(f"Delivered at: {status.delivered_at}")
# Get email analytics
analytics = client.analytics.get_email_stats(
start_date="2025-01-01",
end_date="2025-01-31"
)
print(f"Total sent: {analytics.sent}")
print(f"Delivery rate: {analytics.delivery_rate}%")
print(f"Open rate: {analytics.open_rate}%")
print(f"Click rate: {analytics.click_rate}%")
import sparkmailr
from sparkmailr.exceptions import (
SparkMailrError,
AuthenticationError,
RateLimitError,
ValidationError
)
client = sparkmailr.SparkMailrClient(api_key="your-api-key")
try:
response = client.emails.send({
"to": ["invalid-email"],
"from": "noreply@yourapp.com",
"subject": "Test Email"
})
except AuthenticationError:
print("Invalid API key")
except RateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after} seconds")
except ValidationError as e:
print(f"Validation failed: {e.details}")
for field, errors in e.validation_errors.items():
print(f" {field}: {', '.join(errors)}")
except SparkMailrError as e:
print(f"SparkMailr error: {e.message}")
except Exception as e:
print(f"Unexpected error: {str(e)}")
import asyncio
import sparkmailr
async def send_emails():
# Initialize async client
client = sparkmailr.AsyncSparkMailrClient(api_key="your-api-key")
try:
# Send multiple emails concurrently
tasks = []
for i in range(10):
task = client.emails.send({
"to": [f"user{i}@example.com"],
"from": "noreply@yourapp.com",
"subject": f"Email {i}",
"html": f"<h1>Hello User {i}</h1>"
})
tasks.append(task)
# Wait for all emails to be sent
responses = await asyncio.gather(*tasks)
for response in responses:
print(f"Email sent: {response.id}")
finally:
await client.close()
# Run async function
asyncio.run(send_emails())
client.emails.send(data)Send a single email.
Parameters:to (list): Recipient email addressesfrom (str): Sender email addresssubject (str): Email subjecthtml (str, optional): HTML contenttext (str, optional): Plain text contenttemplate_id (str, optional): Template IDtemplate_data (dict, optional): Template variablesclient.emails.get_status(email_id)Get email delivery status.
Returns: EmailStatus object with status, timestamps, and delivery info.client.analytics.get_email_stats(start_date, end_date)Get email analytics for date range.
Returns: Analytics object with sent count, delivery rate, open rate, etc.import unittest
from unittest.mock import patch, Mock
import sparkmailr
class TestSparkMailrIntegration(unittest.TestCase):
def setUp(self):
self.client = sparkmailr.SparkMailrClient(api_key="test-key")
@patch('sparkmailr.client.requests.post')
def test_send_email_success(self, mock_post):
# Mock successful response
mock_response = Mock()
mock_response.json.return_value = {
"id": "em_123456",
"status": "queued"
}
mock_response.status_code = 200
mock_post.return_value = mock_response
# Send email
response = self.client.emails.send({
"to": ["test@example.com"],
"from": "noreply@test.com",
"subject": "Test Email",
"html": "<p>Test content</p>"
})
# Assertions
self.assertEqual(response.id, "em_123456")
self.assertEqual(response.status, "queued")
mock_post.assert_called_once()
if __name__ == '__main__':
unittest.main()
# settings.py
SPARKMAILR_API_KEY = 'your-api-key'
SPARKMAILR_BASE_URL = 'https://api.sparkmailr.com'
# utils/email.py
import sparkmailr
from django.conf import settings
client = sparkmailr.SparkMailrClient(
api_key=settings.SPARKMAILR_API_KEY,
base_url=settings.SPARKMAILR_BASE_URL
)
def send_welcome_email(user):
"""Send welcome email to new user"""
return client.emails.send({
"to": [user.email],
"from": "noreply@yourapp.com",
"template_id": "welcome-template",
"template_data": {
"name": user.get_full_name(),
"activation_url": f"https://yourapp.com/activate/{user.activation_token}"
}
})
# views.py
from django.shortcuts import render
from .utils.email import send_welcome_email
def register_user(request):
if request.method == 'POST':
# ... user creation logic ...
# Send welcome email
try:
email_response = send_welcome_email(user)
print(f"Welcome email sent: {email_response.id}")
except Exception as e:
print(f"Failed to send welcome email: {e}")
return redirect('login')