KB LabsDocs

Workflows

Last updated April 7, 2026


Triggering workflow runs from a product via the generic platform.call() escape hatch.

@kb-labs/platform-client doesn't ship a dedicated workflow proxy — the typed proxies cover only the four core adapter services (LLM, cache, vector store, analytics). Triggering workflows uses the generic platform.call() escape hatch against the workflows adapter on the server, same as any other platform service.

This page covers the pattern. For the workflow engine itself and what the returned objects mean, see Workflows → Overview and Workflows → Spec Reference.

The shape of a workflow trigger

Workflows are triggered by posting to a workflow daemon endpoint. The gateway's Unified Platform API can route these through the workflows adapter namespace:

TypeScript
// Fire a workflow run
const run = await platform.call<WorkflowRun>('workflows', 'run', {
  workflowId: 'nightly-cleanup',
  inputs: {
    scope: 'staging',
    dryRun: false,
  },
});
 
console.log(`Started run ${run.id}, status: ${run.status}`);

The first call argument is the adapter name ('workflows'); the second is the method name ('run'). Subsequent args are the positional arguments the server-side method expects.

The type you pass to platform.call<T>(...) is purely for your own code — the client doesn't validate that the server returns a WorkflowRun. Import the real type from your own copy of @kb-labs/workflow-contracts if you want strict typing, or define a local shape:

TypeScript
interface WorkflowRun {
  id: string;
  status: 'queued' | 'running' | 'success' | 'failed' | 'cancelled' | 'skipped' | 'dlq';
  createdAt: string;
  trigger: { type: string; actor?: string; payload?: Record<string, unknown> };
  jobs: JobRun[];
}

See Workflows → Spec Reference → Runtime types for the full server-side shape.

Polling for completion

Workflows are async — platform.call('workflows', 'run', ...) returns as soon as the run is created, not when it finishes. To wait for completion, poll the run status:

TypeScript
async function waitForRun(platform: KBPlatform, runId: string, timeoutMs = 10 * 60_000): Promise<WorkflowRun> {
  const deadline = Date.now() + timeoutMs;
 
  while (Date.now() < deadline) {
    const run = await platform.call<WorkflowRun>('workflows', 'get', runId);
 
    if (run.status === 'success' || run.status === 'failed' || run.status === 'cancelled') {
      return run;
    }
 
    // Sleep before polling again
    await new Promise((r) => setTimeout(r, 2000));
  }
 
  throw new Error(`Workflow ${runId} did not complete within ${timeoutMs}ms`);
}
 
// Usage:
const run = await platform.call<WorkflowRun>('workflows', 'run', {
  workflowId: 'my-workflow',
  inputs: { foo: 'bar' },
});
 
const completed = await waitForRun(platform, run.id);
 
if (completed.status === 'success') {
  console.log('Run succeeded');
} else {
  console.error(`Run ${completed.status}:`, completed.jobs);
}

A few things to notice:

  • No built-in polling helper. The client is intentionally thin — polling logic belongs in your product code.
  • The server doesn't push updates. @kb-labs/platform-client is HTTP-only with no WebSocket support, so "wait for completion" means "poll until done".
  • Pick a sensible poll interval. 2s is fine for most workflows; shorter intervals hammer the gateway without benefit.

If you need real-time updates (without polling), you'd have to go outside @kb-labs/platform-client and open an SSE or WebSocket connection to the gateway's workflow event stream directly — that's not exposed through the client today.

Triggering with idempotency

Workflows support idempotency keys to avoid duplicate runs when clients retry:

TypeScript
const run = await platform.call<WorkflowRun>('workflows', 'run', {
  workflowId: 'nightly-cleanup',
  inputs: { scope: 'staging' },
  idempotencyKey: 'cleanup-2026-04-07',
});

If you call this twice with the same idempotencyKey within the configured window, the second call returns the existing run instead of creating a new one. Use this for retriable client-side code where you want exactly-once semantics.

See Workflows → Spec Reference → Metadata for the idempotencyKey field on RunMetadata.

Cancelling a run

TypeScript
await platform.call<void>('workflows', 'cancel', runId);

Cancellation is cooperative — the engine marks the run as 'cancelled' and stops scheduling new jobs. In-flight jobs finish or abort based on their handler's response to ctx.signal.

Listing runs

TypeScript
interface ListRunsFilter {
  workflowId?: string;
  status?: 'queued' | 'running' | 'success' | 'failed' | 'cancelled' | 'skipped' | 'dlq';
  limit?: number;
  offset?: number;
}
 
const runs = await platform.call<WorkflowRun[]>('workflows', 'list', {
  workflowId: 'nightly-cleanup',
  status: 'failed',
  limit: 10,
});
 
console.log(`Found ${runs.length} failed runs`);

The server implementation of listRuns is optional (IWorkflowEngine.listRuns?). Some deployments may not expose it; a 404 or "method not found" error tells you it's not available.

A complete example

A product backend that kicks off a workflow, waits for it, and reports the result:

TypeScript
import { KBPlatform } from '@kb-labs/platform-client';
 
interface JobRun {
  jobName: string;
  status: string;
  durationMs?: number;
  error?: { message: string; code?: string };
}
 
interface WorkflowRun {
  id: string;
  status: 'queued' | 'running' | 'success' | 'failed' | 'cancelled' | 'skipped' | 'dlq';
  createdAt: string;
  jobs: JobRun[];
}
 
const platform = new KBPlatform({
  endpoint: process.env.KB_GATEWAY_URL!,
  apiKey: process.env.KB_API_KEY!,
  defaultTags: { source: 'release-tool' },
});
 
export async function runRelease(version: string, dryRun: boolean): Promise<void> {
  // Trigger
  const run = await platform.call<WorkflowRun>('workflows', 'run', {
    workflowId: 'release',
    inputs: { version, dryRun },
    idempotencyKey: `release-${version}`,
  });
 
  console.log(`Release started: ${run.id}`);
  platform.telemetry.event('release.started', { version, runId: run.id });
 
  // Poll until completion
  const start = Date.now();
  let completed = run;
 
  while (Date.now() - start < 30 * 60_000) {
    completed = await platform.call<WorkflowRun>('workflows', 'get', run.id);
 
    if (['success', 'failed', 'cancelled'].includes(completed.status)) break;
 
    await new Promise((r) => setTimeout(r, 2000));
  }
 
  // Report
  if (completed.status === 'success') {
    console.log(`Release ${version} completed successfully`);
    platform.telemetry.event('release.completed', {
      version,
      durationMs: Date.now() - start,
    });
  } else {
    const failedJob = completed.jobs.find((j) => j.status === 'failed');
    const message = failedJob?.error?.message ?? 'Unknown failure';
    console.error(`Release ${version} ${completed.status}: ${message}`);
    platform.telemetry.event('release.failed', {
      version,
      status: completed.status,
      reason: message,
    });
    throw new Error(`Release failed: ${message}`);
  }
 
  await platform.telemetry.flush();
}

This is the typical product-side pattern: construct the client once, use call() for triggering, poll for completion, emit telemetry around it. No dedicated workflow proxy needed.

What's not supported

  • Real-time events. No SSE, no WebSocket, no push updates. Poll or use the REST API directly.
  • Step-level streaming. The run's jobs array is available via get, but per-step output doesn't stream to the client.
  • Artifact download. The client doesn't wrap artifact downloads. Use raw fetch against the REST API's artifact endpoints.
  • Workflow spec authoring. You can't upload a new workflow spec via the client — specs are part of the plugin/workflow registry, installed separately.

If any of these matter to your use case, drop down to raw HTTP against the gateway's workflow endpoints. The client intentionally covers only the common path.