SDK9 min read

Python SDK — full reference

Every method, every kwarg, every return type. The complete Python SDK reference — Client, AsyncClient, models, errors, retry strategy, hooks.

This is the complete reference for the official Inbox Check Python SDK. Every constructor keyword, every method signature, every exception class, every retry knob. If you want the 60-second quickstart, read inbox-placement-api-python-sdk-quickstart instead. If you want to know what tests.stream() actually yields and how timeouts interact with retries, keep reading.

Package details

Package name: inbox-check. Import root: inbox_check. Minimum Python: 3.9. Wheels for CPython 3.9–3.13, PyPy 3.10. Pure Python, no C extensions. The only hard dependency is httpx; pydantic v2 is optional and auto-detected.

Install

pip install inbox-check
# or with uv
uv pip install inbox-check
# with pydantic models (recommended)
pip install 'inbox-check[pydantic]'

The [pydantic] extra installs pydantic>=2.5. Without it, responses are returned as TypedDict— still typed, just without validation.

Client: the sync client

The sync client is a thin wrapper around httpx.Client. Use it in CLI tools, cron scripts, and Celery workers. For FastAPI or aiohttp, reach for AsyncClient instead.

Constructor signature

from inbox_check import Client

client = Client(
    api_key: str,                         # required, or env INBOX_CHECK_API_KEY
    base_url: str = "https://check.live-direct-marketing.online",
    timeout: float | httpx.Timeout = 30.0,
    max_retries: int = 3,
    retry_backoff: float = 0.5,           # seconds, exponential
    retry_on_status: tuple[int, ...] = (429, 500, 502, 503, 504),
    user_agent: str = "inbox-check-python/1.x.x",
    http_client: httpx.Client | None = None,  # inject your own
    on_request: Callable | None = None,   # logging hook
    on_response: Callable | None = None,
)

Every keyword has a sensible default. The only one you must supply is api_key, and even that can come from INBOX_CHECK_API_KEY. The client is safe to share across threads but not across processes — fork-safe behaviour requires a fresh instance per child.

AsyncClient: the async client

Same surface, async methods. Built on httpx.AsyncClient.

from inbox_check import AsyncClient

async with AsyncClient(api_key="ic_live_...") as client:
    test = await client.tests.create(
        sender_domain="news.yourbrand.com",
        subject="Weekly digest",
        html="<p>Hello</p>",
    )
    async for event in client.tests.stream(test.id):
        if event.type == "complete":
            print(event.data.summary)
            break

The async with block closes the underlying HTTP pool. If you manage the lifetime manually, call await client.aclose().

tests.create() — full signature

client.tests.create(
    sender_domain: str,                    # required
    subject: str,                          # required
    html: str | None = None,
    text: str | None = None,
    headers: dict[str, str] | None = None,
    from_name: str | None = None,
    providers: list[str] | None = None,    # subset of seed list
    reply_to: str | None = None,
    tags: list[str] | None = None,         # arbitrary metadata
    webhook_url: str | None = None,        # override account default
    idempotency_key: str | None = None,
    timeout: float | None = None,          # overrides client default
) -> Test

At least one of html or text is required. If both are present, the API sends a multipart message. The tags field echoes back on every webhook event — useful for correlating a test with a specific campaign row in your database.

Example: tagged campaign test

test = client.tests.create(
    sender_domain="news.yourbrand.com",
    subject="Weekly digest",
    html=render_template("digest.html", user_count=42_000),
    tags=["digest", "campaign:2026-08-05"],
    idempotency_key="digest-2026-08-05",
)
print(test.id)            # "t_01J9..."
print(test.status)        # "pending"
print(test.created_at)    # datetime object

tests.get() — fetch a single test

client.tests.get(test_id: str) -> Test

# Poll until complete
import time
while True:
    t = client.tests.get(test.id)
    if t.status == "complete":
        break
    time.sleep(5)

print(t.summary.inbox_count, "/", t.summary.total)

Polling is the right choice in CI and short-lived workers where an SSE connection is overkill. For interactive UX, use the stream instead.

tests.stream() — async generator

stream() returns an async iterator of TestEvent objects. Each event has a .type (one of landing, auth, progress, complete, error) and a typed .data payload.

import asyncio
from inbox_check import AsyncClient

async def run():
    async with AsyncClient() as client:
        t = await client.tests.create(
            sender_domain="news.yourbrand.com",
            subject="Stream test",
            html="<p>ok</p>",
        )
        async for ev in client.tests.stream(t.id):
            if ev.type == "landing":
                print(ev.data.provider, "->", ev.data.folder)
            elif ev.type == "complete":
                s = ev.data.summary
                print(f"done: {s.inbox_count}/{s.total} inbox")
                break

asyncio.run(run())

The sync Client also exposes tests.stream() as a regular generator. Under the hood it runs httpx with stream=True and parses SSE frames on a background thread.

tests.list() — pagination

client.tests.list(
    limit: int = 50,
    cursor: str | None = None,
    status: str | None = None,       # "pending" | "running" | "complete"
    tag: str | None = None,
    created_after: datetime | None = None,
    created_before: datetime | None = None,
) -> TestPage

The SDK exposes a convenience iterator:

for test in client.tests.iter_all(tag="digest"):
    print(test.id, test.summary.inbox_rate)

iter_all transparently follows the next_cursor field, making it safe to iterate over millions of historical tests without manual pagination logic.

me.get() — check quota and plan

me = client.me.get()
print(me.plan)                     # "standard"
print(me.quota.monthly_limit)      # 5000
print(me.quota.used_this_month)    # 412
print(me.rate_limit.per_minute)    # 5

Webhook helpers

When you subscribe to webhook callbacks, the SDK can verify the HMAC signature for you:

from inbox_check.webhooks import verify

@app.post("/webhooks/inbox-check")
def handler(request):
    sig = request.headers["X-InboxCheck-Signature"]
    body = request.body  # raw bytes, not parsed JSON
    try:
        event = verify(body, sig, secret=WEBHOOK_SECRET)
    except SignatureError:
        return 401
    if event.type == "test.complete":
        process(event.data)
    return 200

Error classes

Every exception inherits from InboxCheckError:

  • APIError — generic 4xx/5xx with a message and status code.
  • AuthError — 401/403. Bad or missing key, revoked key, wrong environment.
  • RateLimitError — 429. Exposes .retry_after in seconds.
  • NotFoundError — 404. Wrong test id, already-deleted resource.
  • ValidationError — 422. Bad payload; contains a .fields list of per-field messages.
  • ServerError — 5xx. Automatically retried up to max_retries unless you set it to zero.
  • TimeoutError — subclass of httpx.TimeoutException.

Retry strategy

Defaults cover most cases. The strategy is: retry on (429, 500, 502, 503, 504) and on network-level errors, up to max_retries, with exponential backoff starting at retry_backoff seconds and jitter.

# Aggressive retries in a nightly job
client = Client(max_retries=6, retry_backoff=1.0)

# No retries at all (e.g. in a CI step where you want to see failures immediately)
client = Client(max_retries=0)
Idempotency

Passing idempotency_key on tests.create makes retries safe — the server deduplicates by key for 24 hours. Without a key, retried POSTs after a partial failure will create duplicate tests.

Logging hooks

on_request and on_response receive the underlying httpx request/response objects. Attach them to a structured logger:

import logging, time
log = logging.getLogger("inbox_check")

def on_req(req):
    req.extensions["started_at"] = time.monotonic()
    log.info("ic.request", extra={"method": req.method, "url": str(req.url)})

def on_resp(resp):
    dur = time.monotonic() - resp.request.extensions["started_at"]
    log.info("ic.response", extra={
        "status": resp.status_code,
        "duration_ms": int(dur * 1000),
    })

client = Client(on_request=on_req, on_response=on_resp)

Typing — pydantic models

With the [pydantic] extra, all responses are typed models. The main shapes:

class Test(BaseModel):
    id: str
    status: Literal["pending", "running", "complete", "failed"]
    created_at: datetime
    completed_at: datetime | None
    summary: Summary | None
    auth: Auth | None
    providers: list[ProviderResult]
    tags: list[str]

class Summary(BaseModel):
    inbox_count: int
    spam_count: int
    missing_count: int
    total: int
    @property
    def inbox_rate(self) -> float: ...

class Auth(BaseModel):
    spf: Literal["pass", "fail", "softfail", "none"]
    dkim: Literal["pass", "fail", "none"]
    dmarc: Literal["pass", "fail", "none"]
    aligned: bool

Full example: a daily monitor script

# daily_monitor.py
import os, asyncio, logging
from inbox_check import AsyncClient, RateLimitError

logging.basicConfig(level=logging.INFO)
SENDERS = ["news.yourbrand.com", "tx.yourbrand.com"]
THRESHOLD = 0.80

async def check_one(client, domain):
    t = await client.tests.create(
        sender_domain=domain,
        subject="Daily monitor",
        html="<p>monitor ping</p>",
        tags=["monitor", f"domain:{domain}"],
    )
    async for ev in client.tests.stream(t.id):
        if ev.type == "complete":
            s = ev.data.summary
            rate = s.inbox_count / s.total
            logging.info("%s: %.1f%% inbox", domain, rate * 100)
            if rate < THRESHOLD:
                await post_slack(domain, rate, s)
            return

async def post_slack(domain, rate, summary):
    # ... your slack helper
    pass

async def main():
    async with AsyncClient(api_key=os.environ["INBOX_CHECK_API_KEY"]) as c:
        for d in SENDERS:
            try:
                await check_one(c, d)
            except RateLimitError as e:
                await asyncio.sleep(e.retry_after)
                await check_one(c, d)

if __name__ == "__main__":
    asyncio.run(main())

Frequently asked questions

Is the SDK thread-safe?

Yes. The sync Client wraps httpx.Client, which is thread-safe for concurrent requests. Share one instance across threads; do not share across forks.

Can I use my own httpx.Client?

Yes — pass it via the http_client kwarg. The SDK will use your instance and will not close it on exit. Handy when you need custom proxies, mTLS, or a shared connection pool with other code.

Does the async client work under Trio or AnyIO?

The AsyncClient is built on httpx.AsyncClient, which supports asyncio and Trio via AnyIO. Pass transport=httpx.AsyncHTTPTransport(...) through http_client to pick your backend.

How do I mock the SDK in tests?

Inject an httpx.Client (sync) or httpx.AsyncClient (async) built with httpx.MockTransport. The SDK makes no network calls except through the transport you provide, so your tests stay hermetic.
Related reading

Check your deliverability across 20+ providers

Gmail, Outlook, Yahoo, Mail.ru, Yandex, GMX, ProtonMail and more. Real inbox screenshots, SPF/DKIM/DMARC, spam engine verdicts. Free, no signup.

Run Free Test →

Unlimited tests · 20+ seed mailboxes · Live results · No account required