TopFlow
LearnBuildSecurity
API ReferenceP1 - Essential

API Reference

Execute TopFlow workflows via HTTP API. This reference covers request/response formats, authentication, error handling, and deployment patterns for production workflows.

Overview

TopFlow workflows can be executed via HTTP API, making them easy to integrate into your applications. When you export a workflow, you can generate either a standalone async function or a Next.js API route handler that accepts HTTP requests.

Key Concepts
1
Workflow as API - Each workflow becomes an HTTP endpoint that accepts input and returns results
2
Streaming Execution - Workflows execute with real-time streaming updates as each node completes
3
No Platform API Keys - TopFlow uses BYOK (Bring Your Own Key) model. You provide your own AI provider API keys via environment variables
4
Export to Code - Workflows export to TypeScript code you own and deploy yourself (no vendor lock-in)

Authentication

BYOK Model: TopFlow doesn't require API keys to the platform itself. Instead, you provide your own AI provider API keys (OpenAI, Anthropic, etc.) via environment variables in your deployment.
Required Environment Variables
Set these in your deployment environment (Vercel, etc.)

Your exported workflow code reads AI provider API keys from environment variables:

# .env.local or deployment environment variables

# OpenAI (for GPT models)
OPENAI_API_KEY=sk-...

# Anthropic (for Claude models)
ANTHROPIC_API_KEY=sk-ant-...

# Google (for Gemini models)
GOOGLE_API_KEY=...

# Groq (for fast inference)
GROQ_API_KEY=gsk_...

# Optional: API keys for external services
PAGERDUTY_TOKEN=...
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
VIRUSTOTAL_API_KEY=...
Security Note: Never commit API keys to git repositories. Use environment variables or secret management services. See Security Best Practices.

Workflow Execution Endpoint

POST /api/execute-workflow
Execute a workflow with streaming updates
Endpoint
POST https://your-domain.vercel.app/api/execute-workflow
Content-Type
application/json
Description

Executes a workflow and returns streaming updates as each node completes. The response is a stream of newline-delimited JSON objects showing node execution progress.

Request Format

Request Body
{
  "nodes": [
    {
      "id": "start",
      "type": "start",
      "data": {
        "input": "Analyze security logs"
      }
    },
    {
      "id": "textModel1",
      "type": "textModel",
      "data": {
        "model": "gpt-4",
        "prompt": "Analyze: $input1",
        "temperature": 0.7,
        "maxTokens": 1000
      }
    },
    {
      "id": "end",
      "type": "end",
      "data": {}
    }
  ],
  "edges": [
    {
      "id": "edge1",
      "source": "start",
      "target": "textModel1"
    },
    {
      "id": "edge2",
      "source": "textModel1",
      "target": "end"
    }
  ],
  "apiKeys": {
    "openai": "sk-...",
    "anthropic": "sk-ant-..."
  }
}
Required Fields:
  • nodes - Array of workflow nodes with id, type, and data
  • edges - Array of connections between nodes
  • apiKeys - Object containing API keys for AI providers used in the workflow
Security: API keys in request body are only used during execution and not stored. For production, use environment variables instead.

Response Format

Streaming Response
Newline-delimited JSON stream of execution updates

The workflow execution endpoint returns a stream of JSON objects, one per line, showing progress as each node executes:

{"type":"node_start","nodeId":"start","timestamp":"2024-01-15T10:30:00.000Z"}
{"type":"node_complete","nodeId":"start","output":"Analyze security logs","timestamp":"2024-01-15T10:30:00.100Z"}
{"type":"node_start","nodeId":"textModel1","timestamp":"2024-01-15T10:30:00.150Z"}
{"type":"node_complete","nodeId":"textModel1","output":"Analysis: Found 3 critical issues...","timestamp":"2024-01-15T10:30:05.500Z"}
{"type":"node_start","nodeId":"end","timestamp":"2024-01-15T10:30:05.600Z"}
{"type":"complete","output":"Analysis: Found 3 critical issues...","timestamp":"2024-01-15T10:30:05.700Z"}
Event Types:
  • node_start - Node execution began
  • node_complete - Node finished successfully (includes output)
  • node_error - Node failed (includes error message)
  • complete - Entire workflow completed successfully
  • error - Workflow failed (includes error message)
Success Response Example
{
  "type": "complete",
  "output": {
    "status": "completed",
    "result": "Security analysis complete: 2 critical vulnerabilities detected",
    "timestamp": "2024-01-15T10:30:05.700Z"
  },
  "executionTime": 5542,
  "nodesExecuted": 5
}

Error Handling

Error Response Format
{
  "type": "error",
  "error": "HTTP Request node failed: 500 Internal Server Error",
  "nodeId": "httpRequest1",
  "timestamp": "2024-01-15T10:30:03.500Z"
}
Common Error Codes
400
Bad Request

Invalid workflow structure, missing required fields, or validation errors

429
Rate Limit Exceeded

Too many requests (10 per minute per IP). Wait and retry after cooldown period.

500
Internal Server Error

Workflow execution failed due to node error, timeout, or unexpected issue

503
Service Unavailable

Server overloaded or temporarily unavailable. Implement exponential backoff retry logic.

Rate Limits

Default Rate Limit: 10 requests per minute per client IP address. Exceeding this limit returns HTTP 429 status.
Rate Limit Details
Limit Scope:
  • Per IP Address: Rate limits apply per client IP
  • Sliding Window: 1-minute rolling window (not fixed time buckets)
  • All Endpoints: Counts towards total requests across all workflow executions
Rate Limit Response Headers:
X-RateLimit-Limit: 10
X-RateLimit-Remaining: 7
X-RateLimit-Reset: 1705317000
Handling Rate Limits:
async function executeWithRetry(workflow, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      const response = await fetch("/api/execute-workflow", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify(workflow)
      })

      if (response.status === 429) {
        const resetTime = response.headers.get("X-RateLimit-Reset")
        const waitMs = (parseInt(resetTime) * 1000) - Date.now()
        console.log(`Rate limited. Waiting ${waitMs}ms`)
        await new Promise(resolve => setTimeout(resolve, waitMs))
        continue
      }

      return response
    } catch (error) {
      if (i === maxRetries - 1) throw error
      await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, i)))
    }
  }
}

Code Examples

cURL
Command-line workflow execution
curl -X POST https://your-domain.vercel.app/api/execute-workflow \
  -H "Content-Type: application/json" \
  -d '{
    "nodes": [
      {
        "id": "start",
        "type": "start",
        "data": { "input": "Analyze threat intelligence" }
      },
      {
        "id": "httpRequest1",
        "type": "httpRequest",
        "data": {
          "method": "GET",
          "url": "https://api.example.com/threats"
        }
      },
      {
        "id": "textModel1",
        "type": "textModel",
        "data": {
          "model": "gpt-4",
          "prompt": "Summarize: $input1",
          "temperature": 0.3
        }
      },
      {
        "id": "end",
        "type": "end",
        "data": {}
      }
    ],
    "edges": [
      { "source": "start", "target": "httpRequest1" },
      { "source": "httpRequest1", "target": "textModel1" },
      { "source": "textModel1", "target": "end" }
    ],
    "apiKeys": {
      "openai": "sk-..."
    }
  }'
JavaScript / TypeScript
Browser or Node.js execution with streaming
async function executeWorkflow(workflowData) {
  const response = await fetch("/api/execute-workflow", {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    },
    body: JSON.stringify(workflowData)
  })

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`)
  }

  // Read streaming response
  const reader = response.body?.getReader()
  const decoder = new TextDecoder()

  if (!reader) throw new Error("No response body")

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    // Decode chunk and parse each line as JSON
    const chunk = decoder.decode(value)
    const lines = chunk.split("\n").filter(line => line.trim())

    for (const line of lines) {
      const event = JSON.parse(line)

      switch (event.type) {
        case "node_start":
          console.log(`Node started: ${event.nodeId}`)
          break
        case "node_complete":
          console.log(`Node completed: ${event.nodeId}`, event.output)
          break
        case "node_error":
          console.error(`Node failed: ${event.nodeId}`, event.error)
          break
        case "complete":
          console.log("Workflow completed!", event.output)
          return event.output
        case "error":
          throw new Error(`Workflow failed: ${event.error}`)
      }
    }
  }
}

// Usage
const workflow = {
  nodes: [ /* ... */ ],
  edges: [ /* ... */ ],
  apiKeys: {
    openai: process.env.OPENAI_API_KEY
  }
}

try {
  const result = await executeWorkflow(workflow)
  console.log("Final result:", result)
} catch (error) {
  console.error("Execution failed:", error)
}
Python
Workflow execution with requests library
import requests
import json
import os

def execute_workflow(workflow_data):
    """Execute a TopFlow workflow via API."""
    url = "https://your-domain.vercel.app/api/execute-workflow"
    headers = {"Content-Type": "application/json"}

    response = requests.post(
        url,
        headers=headers,
        json=workflow_data,
        stream=True  # Enable streaming
    )

    if response.status_code == 429:
        reset_time = response.headers.get("X-RateLimit-Reset")
        print(f"Rate limited. Retry after {reset_time}")
        return None

    if not response.ok:
        raise Exception(f"HTTP {response.status_code}: {response.text}")

    # Process streaming response
    for line in response.iter_lines():
        if not line:
            continue

        event = json.loads(line.decode("utf-8"))

        if event["type"] == "node_start":
            print(f"Node started: {event['nodeId']}")
        elif event["type"] == "node_complete":
            print(f"Node completed: {event['nodeId']}")
        elif event["type"] == "node_error":
            print(f"Node failed: {event['nodeId']} - {event['error']}")
        elif event["type"] == "complete":
            print("Workflow completed!")
            return event["output"]
        elif event["type"] == "error":
            raise Exception(f"Workflow failed: {event['error']}")

    return None

# Usage
workflow = {
    "nodes": [
        {
            "id": "start",
            "type": "start",
            "data": {"input": "Analyze security data"}
        },
        {
            "id": "textModel1",
            "type": "textModel",
            "data": {
                "model": "gpt-4",
                "prompt": "Analyze: $input1",
                "temperature": 0.7
            }
        },
        {
            "id": "end",
            "type": "end",
            "data": {}
        }
    ],
    "edges": [
        {"source": "start", "target": "textModel1"},
        {"source": "textModel1", "target": "end"}
    ],
    "apiKeys": {
        "openai": os.environ["OPENAI_API_KEY"]
    }
}

try:
    result = execute_workflow(workflow)
    print(f"Final result: {result}")
except Exception as error:
    print(f"Execution failed: {error}")

Deployment

Deploying to Vercel
Recommended platform for TopFlow workflows
  1. Export your workflow to TypeScript code using the "Export Code" feature in TopFlow
  2. Create a Next.js project (if you don't have one):
    npx create-next-app@latest my-workflow-api
  3. Add the exported code to app/api/workflow/route.ts
  4. Install dependencies:
    npm install ai @ai-sdk/openai @ai-sdk/anthropic
  5. Deploy to Vercel:
    vercel deploy
  6. Set environment variables in Vercel dashboard (Settings → Environment Variables):
    OPENAI_API_KEY=sk-...
    ANTHROPIC_API_KEY=sk-ant-...
Alternative Deployment Options
AWS Lambda

Export as standalone function, package dependencies, deploy via AWS Console or SAM

Cloudflare Workers

Adapt code for edge runtime, use Wrangler CLI for deployment

Docker Container

Package as Next.js app in Docker, deploy to any container platform (ECS, GKE, Azure Container Instances)

Self-Hosted

Run Next.js production server on your own infrastructure with npm run build && npm start

Best Practices

Do
✓Use environment variables for API keys in production, never hardcode them
✓Implement retry logic with exponential backoff for transient errors and rate limits
✓Monitor execution time and set appropriate timeouts (workflows timeout after 30s by default)
✓Log execution events for debugging and audit trails
✓Validate inputs before sending to workflow execution endpoint
✓Use HTTPS in production for all API calls
Don't
✕Don't commit API keys to git repositories or expose them in client-side code
✕Don't ignore rate limits - Implement proper backoff and queuing
✕Don't skip error handling - Always handle failures gracefully
✕Don't use HTTP in production (use HTTPS for security)
✕Don't send large payloads - Keep workflow definitions reasonable in size
✕Don't ignore streaming updates - Process events to track execution progress
Next Steps
Integration Guide

Learn how to integrate workflows with external services (Slack, PagerDuty, Splunk, etc.)

Workflow Patterns

Explore common workflow patterns for error handling, retry logic, and parallel execution

Security Best Practices

Learn how to secure your workflows, protect API keys, and prevent common vulnerabilities