This is the complete reference for the official Inbox Check Python SDK. Every constructor keyword, every method signature, every exception class, every retry knob. If you want the 60-second quickstart, read inbox-placement-api-python-sdk-quickstart instead. If you want to know what tests.stream() actually yields and how timeouts interact with retries, keep reading.
Package name: inbox-check. Import root: inbox_check. Minimum Python: 3.9. Wheels for CPython 3.9–3.13, PyPy 3.10. Pure Python, no C extensions. The only hard dependency is httpx; pydantic v2 is optional and auto-detected.
Install
pip install inbox-check
# or with uv
uv pip install inbox-check
# with pydantic models (recommended)
pip install 'inbox-check[pydantic]'The [pydantic] extra installs pydantic>=2.5. Without it, responses are returned as TypedDict— still typed, just without validation.
Client: the sync client
The sync client is a thin wrapper around httpx.Client. Use it in CLI tools, cron scripts, and Celery workers. For FastAPI or aiohttp, reach for AsyncClient instead.
Constructor signature
from inbox_check import Client
client = Client(
api_key: str, # required, or env INBOX_CHECK_API_KEY
base_url: str = "https://check.live-direct-marketing.online",
timeout: float | httpx.Timeout = 30.0,
max_retries: int = 3,
retry_backoff: float = 0.5, # seconds, exponential
retry_on_status: tuple[int, ...] = (429, 500, 502, 503, 504),
user_agent: str = "inbox-check-python/1.x.x",
http_client: httpx.Client | None = None, # inject your own
on_request: Callable | None = None, # logging hook
on_response: Callable | None = None,
)Every keyword has a sensible default. The only one you must supply is api_key, and even that can come from INBOX_CHECK_API_KEY. The client is safe to share across threads but not across processes — fork-safe behaviour requires a fresh instance per child.
AsyncClient: the async client
Same surface, async methods. Built on httpx.AsyncClient.
from inbox_check import AsyncClient
async with AsyncClient(api_key="ic_live_...") as client:
test = await client.tests.create(
sender_domain="news.yourbrand.com",
subject="Weekly digest",
html="<p>Hello</p>",
)
async for event in client.tests.stream(test.id):
if event.type == "complete":
print(event.data.summary)
breakThe async with block closes the underlying HTTP pool. If you manage the lifetime manually, call await client.aclose().
tests.create() — full signature
client.tests.create(
sender_domain: str, # required
subject: str, # required
html: str | None = None,
text: str | None = None,
headers: dict[str, str] | None = None,
from_name: str | None = None,
providers: list[str] | None = None, # subset of seed list
reply_to: str | None = None,
tags: list[str] | None = None, # arbitrary metadata
webhook_url: str | None = None, # override account default
idempotency_key: str | None = None,
timeout: float | None = None, # overrides client default
) -> TestAt least one of html or text is required. If both are present, the API sends a multipart message. The tags field echoes back on every webhook event — useful for correlating a test with a specific campaign row in your database.
Example: tagged campaign test
test = client.tests.create(
sender_domain="news.yourbrand.com",
subject="Weekly digest",
html=render_template("digest.html", user_count=42_000),
tags=["digest", "campaign:2026-08-05"],
idempotency_key="digest-2026-08-05",
)
print(test.id) # "t_01J9..."
print(test.status) # "pending"
print(test.created_at) # datetime objecttests.get() — fetch a single test
client.tests.get(test_id: str) -> Test
# Poll until complete
import time
while True:
t = client.tests.get(test.id)
if t.status == "complete":
break
time.sleep(5)
print(t.summary.inbox_count, "/", t.summary.total)Polling is the right choice in CI and short-lived workers where an SSE connection is overkill. For interactive UX, use the stream instead.
tests.stream() — async generator
stream() returns an async iterator of TestEvent objects. Each event has a .type (one of landing, auth, progress, complete, error) and a typed .data payload.
import asyncio
from inbox_check import AsyncClient
async def run():
async with AsyncClient() as client:
t = await client.tests.create(
sender_domain="news.yourbrand.com",
subject="Stream test",
html="<p>ok</p>",
)
async for ev in client.tests.stream(t.id):
if ev.type == "landing":
print(ev.data.provider, "->", ev.data.folder)
elif ev.type == "complete":
s = ev.data.summary
print(f"done: {s.inbox_count}/{s.total} inbox")
break
asyncio.run(run())The sync Client also exposes tests.stream() as a regular generator. Under the hood it runs httpx with stream=True and parses SSE frames on a background thread.
tests.list() — pagination
client.tests.list(
limit: int = 50,
cursor: str | None = None,
status: str | None = None, # "pending" | "running" | "complete"
tag: str | None = None,
created_after: datetime | None = None,
created_before: datetime | None = None,
) -> TestPageThe SDK exposes a convenience iterator:
for test in client.tests.iter_all(tag="digest"):
print(test.id, test.summary.inbox_rate)iter_all transparently follows the next_cursor field, making it safe to iterate over millions of historical tests without manual pagination logic.
me.get() — check quota and plan
me = client.me.get()
print(me.plan) # "standard"
print(me.quota.monthly_limit) # 5000
print(me.quota.used_this_month) # 412
print(me.rate_limit.per_minute) # 5Webhook helpers
When you subscribe to webhook callbacks, the SDK can verify the HMAC signature for you:
from inbox_check.webhooks import verify
@app.post("/webhooks/inbox-check")
def handler(request):
sig = request.headers["X-InboxCheck-Signature"]
body = request.body # raw bytes, not parsed JSON
try:
event = verify(body, sig, secret=WEBHOOK_SECRET)
except SignatureError:
return 401
if event.type == "test.complete":
process(event.data)
return 200Error classes
Every exception inherits from InboxCheckError:
APIError— generic 4xx/5xx with a message and status code.AuthError— 401/403. Bad or missing key, revoked key, wrong environment.RateLimitError— 429. Exposes.retry_afterin seconds.NotFoundError— 404. Wrong test id, already-deleted resource.ValidationError— 422. Bad payload; contains a.fieldslist of per-field messages.ServerError— 5xx. Automatically retried up tomax_retriesunless you set it to zero.TimeoutError— subclass ofhttpx.TimeoutException.
Retry strategy
Defaults cover most cases. The strategy is: retry on (429, 500, 502, 503, 504) and on network-level errors, up to max_retries, with exponential backoff starting at retry_backoff seconds and jitter.
# Aggressive retries in a nightly job
client = Client(max_retries=6, retry_backoff=1.0)
# No retries at all (e.g. in a CI step where you want to see failures immediately)
client = Client(max_retries=0)Passing idempotency_key on tests.create makes retries safe — the server deduplicates by key for 24 hours. Without a key, retried POSTs after a partial failure will create duplicate tests.
Logging hooks
on_request and on_response receive the underlying httpx request/response objects. Attach them to a structured logger:
import logging, time
log = logging.getLogger("inbox_check")
def on_req(req):
req.extensions["started_at"] = time.monotonic()
log.info("ic.request", extra={"method": req.method, "url": str(req.url)})
def on_resp(resp):
dur = time.monotonic() - resp.request.extensions["started_at"]
log.info("ic.response", extra={
"status": resp.status_code,
"duration_ms": int(dur * 1000),
})
client = Client(on_request=on_req, on_response=on_resp)Typing — pydantic models
With the [pydantic] extra, all responses are typed models. The main shapes:
class Test(BaseModel):
id: str
status: Literal["pending", "running", "complete", "failed"]
created_at: datetime
completed_at: datetime | None
summary: Summary | None
auth: Auth | None
providers: list[ProviderResult]
tags: list[str]
class Summary(BaseModel):
inbox_count: int
spam_count: int
missing_count: int
total: int
@property
def inbox_rate(self) -> float: ...
class Auth(BaseModel):
spf: Literal["pass", "fail", "softfail", "none"]
dkim: Literal["pass", "fail", "none"]
dmarc: Literal["pass", "fail", "none"]
aligned: boolFull example: a daily monitor script
# daily_monitor.py
import os, asyncio, logging
from inbox_check import AsyncClient, RateLimitError
logging.basicConfig(level=logging.INFO)
SENDERS = ["news.yourbrand.com", "tx.yourbrand.com"]
THRESHOLD = 0.80
async def check_one(client, domain):
t = await client.tests.create(
sender_domain=domain,
subject="Daily monitor",
html="<p>monitor ping</p>",
tags=["monitor", f"domain:{domain}"],
)
async for ev in client.tests.stream(t.id):
if ev.type == "complete":
s = ev.data.summary
rate = s.inbox_count / s.total
logging.info("%s: %.1f%% inbox", domain, rate * 100)
if rate < THRESHOLD:
await post_slack(domain, rate, s)
return
async def post_slack(domain, rate, summary):
# ... your slack helper
pass
async def main():
async with AsyncClient(api_key=os.environ["INBOX_CHECK_API_KEY"]) as c:
for d in SENDERS:
try:
await check_one(c, d)
except RateLimitError as e:
await asyncio.sleep(e.retry_after)
await check_one(c, d)
if __name__ == "__main__":
asyncio.run(main())