The Pipeline API allows you to execute pre-configured multi-step workflows that combine parsing, extraction, splitting, and other operations in a single request.
Basic Usage
from reducto import Reducto
client = Reducto()
response = client.pipeline.run(
input="https://example.com/document.pdf",
pipeline_id="your-pipeline-id"
)
print(response)
import asyncio
from reducto import AsyncReducto
client = AsyncReducto()
async def main():
response = await client.pipeline.run(
input="https://example.com/document.pdf",
pipeline_id="your-pipeline-id"
)
print(response)
asyncio.run(main())
Method Signature
client.pipeline.run(
input: str,
pipeline_id: str,
settings: dict | None = None
) -> PipelineResponse
Parameters
The URL of the document to process. You can provide:
- A publicly available URL
- A presigned S3 URL
- A
reducto:// prefixed URL from the /upload endpoint
- A
jobid:// prefixed URL from a previous parse invocation
- A list of URLs (for multi-document pipelines, V3 API only)
The ID of the pipeline to use for processing the document. Pipelines are pre-configured workflows created in your Reducto dashboard.
Settings for pipeline execution that override pipeline defaults. These settings depend on your specific pipeline configuration.
Understanding Pipelines
Pipelines combine multiple operations into reusable workflows. A typical pipeline might:
- Parse the document with specific formatting options
- Split it into categorized sections
- Extract structured data from each section
- Apply transformations or filters
Pipelines are configured in the Reducto dashboard and referenced by their unique ID.
Invoice Processing Pipeline
from reducto import Reducto
client = Reducto()
# Run an invoice processing pipeline
response = client.pipeline.run(
input="https://example.com/invoice.pdf",
pipeline_id="invoice-extraction-v1"
)
# Access structured invoice data
print(f"Invoice #: {response.data['invoice_number']}")
print(f"Total: {response.data['total_amount']}")
print(f"Items: {len(response.data['line_items'])}")
import asyncio
from reducto import AsyncReducto
client = AsyncReducto()
async def main():
# Run an invoice processing pipeline
response = await client.pipeline.run(
input="https://example.com/invoice.pdf",
pipeline_id="invoice-extraction-v1"
)
# Access structured invoice data
print(f"Invoice #: {response.data['invoice_number']}")
print(f"Total: {response.data['total_amount']}")
print(f"Items: {len(response.data['line_items'])}")
asyncio.run(main())
Contract Analysis Pipeline
Process legal contracts with a multi-step pipeline:
from reducto import Reducto
client = Reducto()
# Pipeline that:
# 1. Parses the contract
# 2. Splits into sections (terms, pricing, signatures)
# 3. Extracts key clauses and dates
# 4. Identifies parties and obligations
response = client.pipeline.run(
input="https://example.com/contract.pdf",
pipeline_id="contract-analysis-v2"
)
print("Contract Analysis Results:")
print(f"Parties: {response.data['parties']}")
print(f"Start Date: {response.data['start_date']}")
print(f"End Date: {response.data['end_date']}")
print(f"Key Terms: {response.data['key_terms']}")
print(f"Obligations: {response.data['obligations']}")
Override Pipeline Settings
Customize pipeline execution with runtime settings:
from reducto import Reducto
client = Reducto()
response = client.pipeline.run(
input="https://example.com/financial-report.pdf",
pipeline_id="financial-extraction-v1",
settings={
"fiscal_year": "2024",
"extract_footnotes": True,
"table_format": "json",
"include_visualizations": False
}
)
Multi-Document Pipelines
Process multiple related documents together:
from reducto import Reducto
client = Reducto()
# Process a set of related documents
documents = [
"https://example.com/balance-sheet.pdf",
"https://example.com/income-statement.pdf",
"https://example.com/cash-flow.pdf"
]
response = client.pipeline.run(
input=documents,
pipeline_id="financial-consolidation-v1",
settings={
"consolidate_results": True,
"cross_reference": True
}
)
print("Consolidated Financial Data:")
print(response.data)
Async Job Processing
For large documents or batch processing, use async pipeline execution:
from reducto import Reducto
client = Reducto()
# Start an async pipeline job
job = client.pipeline.run_job(
input="https://example.com/large-document.pdf",
pipeline_id="comprehensive-analysis-v1",
async_={
"webhook": {
"url": "https://example.com/webhook"
}
}
)
print(f"Pipeline Job ID: {job.job_id}")
# Poll for results
result = client.job.get(job.job_id)
if result.status == "completed":
print("Pipeline completed successfully")
print(result.data)
Batch Processing with Pipelines
Process multiple documents through the same pipeline:
from reducto import Reducto
import time
client = Reducto()
documents = [
"https://example.com/doc1.pdf",
"https://example.com/doc2.pdf",
"https://example.com/doc3.pdf",
"https://example.com/doc4.pdf",
"https://example.com/doc5.pdf"
]
jobs = []
for doc_url in documents:
job = client.pipeline.run_job(
input=doc_url,
pipeline_id="document-processing-v1"
)
jobs.append(job)
print(f"Started job {job.job_id} for {doc_url}")
print(f"\nProcessing {len(jobs)} documents...")
# Wait for all jobs to complete
while True:
completed = 0
for job in jobs:
result = client.job.get(job.job_id)
if result.status == "completed":
completed += 1
print(f"Progress: {completed}/{len(jobs)} completed")
if completed == len(jobs):
break
time.sleep(5)
print("All documents processed!")
# Collect results
for job in jobs:
result = client.job.get(job.job_id)
print(f"\nJob {job.job_id}:")
print(result.data)
Reusing Parsed Documents
Use a pipeline on a document that was previously parsed:
from reducto import Reducto
client = Reducto()
# First parse the document with custom options
parse_response = client.parse.run(
input="https://example.com/document.pdf",
formatting={
"add_page_markers": True,
"table_output_format": "json"
}
)
# Then run a pipeline using the parsed result
pipeline_response = client.pipeline.run(
input=f"jobid://{parse_response.job_id}",
pipeline_id="advanced-extraction-v1"
)
print(pipeline_response.data)
Error Handling
Handle pipeline errors gracefully:
from reducto import Reducto
import reducto
client = Reducto()
try:
response = client.pipeline.run(
input="https://example.com/document.pdf",
pipeline_id="my-pipeline-v1"
)
print("Pipeline succeeded")
print(response.data)
except reducto.APIStatusError as e:
print(f"Pipeline failed with status {e.status_code}")
print(f"Error: {e.response}")
except reducto.APIConnectionError as e:
print("Failed to connect to API")
print(e.__cause__)