Execute TopFlow workflows via HTTP API. This reference covers request/response formats, authentication, error handling, and deployment patterns for production workflows.
TopFlow workflows can be executed via HTTP API, making them easy to integrate into your applications. When you export a workflow, you can generate either a standalone async function or a Next.js API route handler that accepts HTTP requests.
Your exported workflow code reads AI provider API keys from environment variables:
# .env.local or deployment environment variables
# OpenAI (for GPT models)
OPENAI_API_KEY=sk-...
# Anthropic (for Claude models)
ANTHROPIC_API_KEY=sk-ant-...
# Google (for Gemini models)
GOOGLE_API_KEY=...
# Groq (for fast inference)
GROQ_API_KEY=gsk_...
# Optional: API keys for external services
PAGERDUTY_TOKEN=...
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
VIRUSTOTAL_API_KEY=...POST https://your-domain.vercel.app/api/execute-workflowapplication/jsonExecutes a workflow and returns streaming updates as each node completes. The response is a stream of newline-delimited JSON objects showing node execution progress.
{
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"input": "Analyze security logs"
}
},
{
"id": "textModel1",
"type": "textModel",
"data": {
"model": "gpt-4",
"prompt": "Analyze: $input1",
"temperature": 0.7,
"maxTokens": 1000
}
},
{
"id": "end",
"type": "end",
"data": {}
}
],
"edges": [
{
"id": "edge1",
"source": "start",
"target": "textModel1"
},
{
"id": "edge2",
"source": "textModel1",
"target": "end"
}
],
"apiKeys": {
"openai": "sk-...",
"anthropic": "sk-ant-..."
}
}nodes - Array of workflow nodes with id, type, and dataedges - Array of connections between nodesapiKeys - Object containing API keys for AI providers used in the workflowThe workflow execution endpoint returns a stream of JSON objects, one per line, showing progress as each node executes:
{"type":"node_start","nodeId":"start","timestamp":"2024-01-15T10:30:00.000Z"}
{"type":"node_complete","nodeId":"start","output":"Analyze security logs","timestamp":"2024-01-15T10:30:00.100Z"}
{"type":"node_start","nodeId":"textModel1","timestamp":"2024-01-15T10:30:00.150Z"}
{"type":"node_complete","nodeId":"textModel1","output":"Analysis: Found 3 critical issues...","timestamp":"2024-01-15T10:30:05.500Z"}
{"type":"node_start","nodeId":"end","timestamp":"2024-01-15T10:30:05.600Z"}
{"type":"complete","output":"Analysis: Found 3 critical issues...","timestamp":"2024-01-15T10:30:05.700Z"}node_start - Node execution begannode_complete - Node finished successfully (includes output)node_error - Node failed (includes error message)complete - Entire workflow completed successfullyerror - Workflow failed (includes error message){
"type": "complete",
"output": {
"status": "completed",
"result": "Security analysis complete: 2 critical vulnerabilities detected",
"timestamp": "2024-01-15T10:30:05.700Z"
},
"executionTime": 5542,
"nodesExecuted": 5
}{
"type": "error",
"error": "HTTP Request node failed: 500 Internal Server Error",
"nodeId": "httpRequest1",
"timestamp": "2024-01-15T10:30:03.500Z"
}Invalid workflow structure, missing required fields, or validation errors
Too many requests (10 per minute per IP). Wait and retry after cooldown period.
Workflow execution failed due to node error, timeout, or unexpected issue
Server overloaded or temporarily unavailable. Implement exponential backoff retry logic.
X-RateLimit-Limit: 10
X-RateLimit-Remaining: 7
X-RateLimit-Reset: 1705317000async function executeWithRetry(workflow, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch("/api/execute-workflow", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(workflow)
})
if (response.status === 429) {
const resetTime = response.headers.get("X-RateLimit-Reset")
const waitMs = (parseInt(resetTime) * 1000) - Date.now()
console.log(`Rate limited. Waiting ${waitMs}ms`)
await new Promise(resolve => setTimeout(resolve, waitMs))
continue
}
return response
} catch (error) {
if (i === maxRetries - 1) throw error
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, i)))
}
}
}curl -X POST https://your-domain.vercel.app/api/execute-workflow \
-H "Content-Type: application/json" \
-d '{
"nodes": [
{
"id": "start",
"type": "start",
"data": { "input": "Analyze threat intelligence" }
},
{
"id": "httpRequest1",
"type": "httpRequest",
"data": {
"method": "GET",
"url": "https://api.example.com/threats"
}
},
{
"id": "textModel1",
"type": "textModel",
"data": {
"model": "gpt-4",
"prompt": "Summarize: $input1",
"temperature": 0.3
}
},
{
"id": "end",
"type": "end",
"data": {}
}
],
"edges": [
{ "source": "start", "target": "httpRequest1" },
{ "source": "httpRequest1", "target": "textModel1" },
{ "source": "textModel1", "target": "end" }
],
"apiKeys": {
"openai": "sk-..."
}
}'async function executeWorkflow(workflowData) {
const response = await fetch("/api/execute-workflow", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(workflowData)
})
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
// Read streaming response
const reader = response.body?.getReader()
const decoder = new TextDecoder()
if (!reader) throw new Error("No response body")
while (true) {
const { done, value } = await reader.read()
if (done) break
// Decode chunk and parse each line as JSON
const chunk = decoder.decode(value)
const lines = chunk.split("\n").filter(line => line.trim())
for (const line of lines) {
const event = JSON.parse(line)
switch (event.type) {
case "node_start":
console.log(`Node started: ${event.nodeId}`)
break
case "node_complete":
console.log(`Node completed: ${event.nodeId}`, event.output)
break
case "node_error":
console.error(`Node failed: ${event.nodeId}`, event.error)
break
case "complete":
console.log("Workflow completed!", event.output)
return event.output
case "error":
throw new Error(`Workflow failed: ${event.error}`)
}
}
}
}
// Usage
const workflow = {
nodes: [ /* ... */ ],
edges: [ /* ... */ ],
apiKeys: {
openai: process.env.OPENAI_API_KEY
}
}
try {
const result = await executeWorkflow(workflow)
console.log("Final result:", result)
} catch (error) {
console.error("Execution failed:", error)
}import requests
import json
import os
def execute_workflow(workflow_data):
"""Execute a TopFlow workflow via API."""
url = "https://your-domain.vercel.app/api/execute-workflow"
headers = {"Content-Type": "application/json"}
response = requests.post(
url,
headers=headers,
json=workflow_data,
stream=True # Enable streaming
)
if response.status_code == 429:
reset_time = response.headers.get("X-RateLimit-Reset")
print(f"Rate limited. Retry after {reset_time}")
return None
if not response.ok:
raise Exception(f"HTTP {response.status_code}: {response.text}")
# Process streaming response
for line in response.iter_lines():
if not line:
continue
event = json.loads(line.decode("utf-8"))
if event["type"] == "node_start":
print(f"Node started: {event['nodeId']}")
elif event["type"] == "node_complete":
print(f"Node completed: {event['nodeId']}")
elif event["type"] == "node_error":
print(f"Node failed: {event['nodeId']} - {event['error']}")
elif event["type"] == "complete":
print("Workflow completed!")
return event["output"]
elif event["type"] == "error":
raise Exception(f"Workflow failed: {event['error']}")
return None
# Usage
workflow = {
"nodes": [
{
"id": "start",
"type": "start",
"data": {"input": "Analyze security data"}
},
{
"id": "textModel1",
"type": "textModel",
"data": {
"model": "gpt-4",
"prompt": "Analyze: $input1",
"temperature": 0.7
}
},
{
"id": "end",
"type": "end",
"data": {}
}
],
"edges": [
{"source": "start", "target": "textModel1"},
{"source": "textModel1", "target": "end"}
],
"apiKeys": {
"openai": os.environ["OPENAI_API_KEY"]
}
}
try:
result = execute_workflow(workflow)
print(f"Final result: {result}")
except Exception as error:
print(f"Execution failed: {error}")npx create-next-app@latest my-workflow-apiapp/api/workflow/route.tsnpm install ai @ai-sdk/openai @ai-sdk/anthropicvercel deployOPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...Export as standalone function, package dependencies, deploy via AWS Console or SAM
Adapt code for edge runtime, use Wrangler CLI for deployment
Package as Next.js app in Docker, deploy to any container platform (ECS, GKE, Azure Container Instances)
Run Next.js production server on your own infrastructure with npm run build && npm start
Learn how to integrate workflows with external services (Slack, PagerDuty, Splunk, etc.)
Explore common workflow patterns for error handling, retry logic, and parallel execution
Learn how to secure your workflows, protect API keys, and prevent common vulnerabilities