For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
  • Get Started
    • Introduction
    • Quickstart
    • Authentication
  • Python SDK
    • Install
    • Usage
    • Errors
  • CLI
    • Overview
LogoLogo
On this page
  • Construction
  • Context manager
  • Workflows
  • Voices
  • Templates
  • Uploads
  • Pagination
  • Manual pagination
  • Lazy iteration with iter_all()
  • Error handling
Python SDK

Python SDK — Usage

Was this page helpful?
Previous

Errors

Next
Built with

Construction

1from onepin import OnePinClient
2
3# Explicit key
4client = OnePinClient(api_key="op_live_...")
5
6# From environment (reads ONEPIN_API_KEY automatically)
7client = OnePinClient.from_environment()
8
9# Custom base URL (e.g. local dev proxy)
10client = OnePinClient(
11 api_key="op_live_...",
12 base_url="https://api.onepin.ai",
13)

Context manager

The client manages an underlying httpx connection pool. Use it as a context manager to ensure clean shutdown:

1with OnePinClient(api_key="op_live_...") as client:
2 page = client.workflows.list()

Workflows

1# List (single page)
2page = client.workflows.list(limit=50, offset=0)
3for wf in page.items:
4 print(wf.id, wf.name, wf.status)
5
6# Get one
7wf = client.workflows.get("3fa85f64-5717-4562-b3fc-2c963f66afa6")
8print(wf.definition)
9
10# Start a run
11run = client.workflows.runs.start("3fa85f64-5717-4562-b3fc-2c963f66afa6")
12print(run.id, run.status)
13
14# List run history
15runs_page = client.workflows.runs.list("3fa85f64-5717-4562-b3fc-2c963f66afa6", limit=20)
16for r in runs_page.items:
17 print(r.id, r.status, r.created_at)

Voices

1page = client.voices.list(limit=50)
2for v in page.items:
3 print(v.id, v.display_name, v.locale)

Templates

1# List gallery templates
2page = client.templates.list(limit=50)
3for t in page.items:
4 print(t.id, t.name)
5
6# Get one
7tmpl = client.templates.get("3fa85f64-5717-4562-b3fc-2c963f66afa6")
8
9# Clone into a workflow in your workspace (requires workflows:write scope)
10new_wf = client.templates.clone(
11 template_id="3fa85f64-5717-4562-b3fc-2c963f66afa6",
12 name="My production pipeline", # optional; defaults to "<template name> (copy)"
13)
14print(new_wf.id, new_wf.name)
15
16# Convenience: clone then immediately start a run
17run = client.templates.run("3fa85f64-5717-4562-b3fc-2c963f66afa6")
18print(run.workflow_id, run.id)

Uploads

Uploads use a three-step presigned URL flow: create → PUT bytes to S3 → confirm.

1import httpx
2
3# Step 1: create the upload record and get a presigned URL
4result = client.uploads.create(
5 filename="my-script.txt",
6 category="script", # "script" (.txt/.docx) or "dictionary" (.csv)
7)
8upload_id = result.upload.id
9upload_url = result.upload_url
10
11# Step 2: PUT the file bytes directly to S3 (not via the SDK — raw HTTP)
12with open("my-script.txt", "rb") as f:
13 httpx.put(upload_url, content=f.read()) # must return 200
14
15# Step 3: confirm the upload and bind it to a workflow
16upload = client.uploads.confirm(
17 upload_id=upload_id,
18 workflow_id="3fa85f64-5717-4562-b3fc-2c963f66afa6",
19)
20print(upload.id, upload.status)
21
22# Delete an upload
23client.uploads.delete(upload_id)

Categories: category must be "script" (.txt, .docx) or "dictionary" (.csv). Audio file upload uses a separate endpoint not available via API key.


Pagination

Every list() call returns a PaginatedList[T]:

FieldTypeDescription
itemslist[T]Items on this page
pagination.totalintTotal count across all pages
pagination.limitintPage size requested
pagination.has_nextboolMore pages available
pagination.next_offsetint | NoneOffset to pass on the next call

Manual pagination

1offset = 0
2while True:
3 page = client.workflows.list(offset=offset, limit=50)
4 for wf in page.items:
5 process(wf)
6 if not page.pagination.has_next:
7 break
8 offset = page.pagination.next_offset

Lazy iteration with iter_all()

iter_all() is available on workflows, voices, templates. It yields one item at a time and only fetches the next page when the current one is exhausted. An early break stops fetching:

1for wf in client.workflows.iter_all():
2 if should_stop(wf):
3 break # no further HTTP calls after this
4 process(wf)
async
1async for wf in client.workflows.iter_all():
2 process(wf)

Error handling

1from onepin import OnePinClient
2from onepin.errors import OnePinAPIError, OnePinNotFoundError
3
4client = OnePinClient(api_key="op_live_...")
5
6try:
7 wf = client.workflows.get("nonexistent-id")
8except OnePinNotFoundError:
9 print("workflow not found")
10except OnePinAPIError as e:
11 print(f"[{e.error_code}] {e.message} request_id={e.request_id}")

See Errors for the full exception hierarchy and retry patterns.