Batch upsert data records from pipeline platforms like Airbyte, Fivetran, or custom connectors. ## Authentication Requires Pipeline Connector API Key via `X-API-Key` header. ## Request Body Array of pipeline records (max 1,000 per request): | Field | Type | Required | Description | | --- | --- | --- | --- | | `source_system` | string | Yes | Source platform identifier (e.g., "airbyte", "fivetran", "custom") | | `namespace` | string | Yes | Source application namespace (e.g., "clickup", "jira", "notion") | | `action` | string | Yes | Action to perform: "upsert" or "delete" | | `organization_id` | string | Yes | Organization ID (org_ prefixed) | | `id` | string | Yes | Unique identifier from source system (1-255 chars) | | `data` | object | Yes | Free-form data from source system | | `file` | object | No | Optional file attachment metadata | | `created_at` | string | Yes | Record creation timestamp (ISO 8601) | | `updated_at` | string | No | Record last update timestamp (ISO 8601) | ### File Metadata Object | Field | Type | Required | Description | | --- | --- | --- | --- | | `file_url` | string | Yes | Local file path where file is staged | | `file_relative_path` | string | Yes | Relative path in source system | | `source_file_url` | string | No | Original URL in source system | ## Constraints - **Batch Size**: Maximum 1,000 records per request - **Rate Limit**: 100 requests per minute per organization - **Organization**: All records must belong to same organization - **Data**: Cannot be empty object ## Returns Success response with processing statistics. Basic Upsert ```bash cURL curl -X POST https://api.aitronos.com/v1/connectors/pipeline/upsert \ -H "X-API-Key: $PIPELINE_API_KEY" \ -H "Content-Type: application/json" \ -d '[ { "source_system": "airbyte", "namespace": "clickup", "action": "upsert", "organization_id": "org_123abc456def789", "id": "task_12345", "data": { "name": "Implement new feature", "status": "in_progress", "assignee": "john@example.com", "priority": "high" }, "created_at": "2025-01-15T10:30:00Z", "updated_at": "2025-01-15T14:20:00Z" } ]' ``` ```python Python import os import requests from datetime import datetime api_key = os.environ["PIPELINE_API_KEY"] records = [ { "source_system": "airbyte", "namespace": "clickup", "action": "upsert", "organization_id": "org_123abc456def789", "id": "task_12345", "data": { "name": "Implement new feature", "status": "in_progress", "assignee": "john@example.com", "priority": "high" }, "created_at": "2025-01-15T10:30:00Z", "updated_at": "2025-01-15T14:20:00Z" } ] response = requests.post( "https://api.aitronos.com/v1/connectors/pipeline/upsert", headers={"X-API-Key": api_key}, json=records ) print(response.json()) ``` ```javascript JavaScript const records = [ { source_system: "airbyte", namespace: "clickup", action: "upsert", organization_id: "org_123abc456def789", id: "task_12345", data: { name: "Implement new feature", status: "in_progress", assignee: "john@example.com", priority: "high" }, created_at: "2025-01-15T10:30:00Z", updated_at: "2025-01-15T14:20:00Z" } ]; const response = await fetch( "https://api.aitronos.com/v1/connectors/pipeline/upsert", { method: "POST", headers: { "X-API-Key": process.env.PIPELINE_API_KEY, "Content-Type": "application/json" }, body: JSON.stringify(records) } ); const data = await response.json(); console.log(data); ``` With File Attachment ```bash cURL curl -X POST https://api.aitronos.com/v1/connectors/pipeline/upsert \ -H "X-API-Key: $PIPELINE_API_KEY" \ -H "Content-Type: application/json" \ -d '[ { "source_system": "airbyte", "namespace": "onedrive", "action": "upsert", "organization_id": "org_123abc456def789", "id": "file_67890", "data": { "name": "Q4 Report.xlsx", "size": 2048576, "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" }, "file": { "file_url": "/tmp/staging/12345/Q4_Report.xlsx", "file_relative_path": "Documents/Reports/Q4_Report.xlsx", "source_file_url": "https://onedrive.com/files/xyz123" }, "created_at": "2025-01-10T08:00:00Z", "updated_at": "2025-01-15T16:45:00Z" } ]' ``` ```python Python import os import requests api_key = os.environ["PIPELINE_API_KEY"] records = [ { "source_system": "airbyte", "namespace": "onedrive", "action": "upsert", "organization_id": "org_123abc456def789", "id": "file_67890", "data": { "name": "Q4 Report.xlsx", "size": 2048576, "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" }, "file": { "file_url": "/tmp/staging/12345/Q4_Report.xlsx", "file_relative_path": "Documents/Reports/Q4_Report.xlsx", "source_file_url": "https://onedrive.com/files/xyz123" }, "created_at": "2025-01-10T08:00:00Z", "updated_at": "2025-01-15T16:45:00Z" } ] response = requests.post( "https://api.aitronos.com/v1/connectors/pipeline/upsert", headers={"X-API-Key": api_key}, json=records ) print(response.json()) ``` Batch Processing ```python Python import os import requests api_key = os.environ["PIPELINE_API_KEY"] # Process multiple records in one batch records = [ { "source_system": "fivetran", "namespace": "jira", "action": "upsert", "organization_id": "org_123abc456def789", "id": "issue_001", "data": {"title": "Bug fix", "status": "open"}, "created_at": "2025-01-15T10:00:00Z" }, { "source_system": "fivetran", "namespace": "jira", "action": "upsert", "organization_id": "org_123abc456def789", "id": "issue_002", "data": {"title": "Feature request", "status": "in_progress"}, "created_at": "2025-01-15T11:00:00Z" }, { "source_system": "fivetran", "namespace": "jira", "action": "delete", "organization_id": "org_123abc456def789", "id": "issue_003", "data": {"deleted": True}, "created_at": "2025-01-15T12:00:00Z" } ] response = requests.post( "https://api.aitronos.com/v1/connectors/pipeline/upsert", headers={"X-API-Key": api_key}, json=records ) result = response.json() print(f"Processed: {result['processed_count']}") print(f"Failed: {result['failed_count']}") print(f"Summary: {result['summary']}") ``` ## Response ```json 200 - Success { "success": true, "message": "Batch received successfully", "processed_count": 5, "failed_count": 0, "request_id": "req_abc123def456", "processing_time_ms": 145, "summary": { "namespaces": { "clickup": 5 }, "source_systems": { "airbyte": 5 }, "actions": { "upsert": 5 } } } ``` ```json 400 - Batch Size Exceeded { "success": false, "error": { "code": "BATCH_SIZE_EXCEEDED", "message": "Batch size exceeds maximum allowed (1000)", "system_message": "Batch validation failed", "type": "client_error", "status": 400, "details": { "batch_size": 1500, "max_batch_size": 1000 }, "trace_id": "2fbbf3b6-51a1-4f1b-88e2-c00e8b52fbb8", "timestamp": "2025-01-15T10:30:00Z" } } ``` ```json 401 - Invalid API Key { "success": false, "error": { "code": "INVALID_API_KEY", "message": "Invalid pipeline connector API key", "system_message": "Pipeline connector authentication failed", "type": "client_error", "status": 401, "trace_id": "2fbbf3b6-51a1-4f1b-88e2-c00e8b52fbb8", "timestamp": "2025-01-15T10:30:00Z" } } ``` ```json 422 - Validation Error { "success": false, "error": { "code": "INVALID_PIPELINE_PAYLOAD", "message": "All records must belong to the same organization", "system_message": "Payload validation failed", "type": "client_error", "status": 422, "details": { "expected_org_id": "org_123abc456def789", "found_org_id": "org_999xyz888abc", "record_index": 3 }, "trace_id": "2fbbf3b6-51a1-4f1b-88e2-c00e8b52fbb8", "timestamp": "2025-01-15T10:30:00Z" } } ``` ```json 429 - Rate Limit Exceeded { "success": false, "error": { "code": "RATE_LIMIT_EXCEEDED", "message": "Too many requests. Please try again later.", "system_message": "Rate limit exceeded: 100 requests per minute", "type": "client_error", "status": 429, "details": { "retry_after": 60 }, "trace_id": "2fbbf3b6-51a1-4f1b-88e2-c00e8b52fbb8", "timestamp": "2025-01-15T10:30:00Z" } } ```