Job Queue
The job queue stores background work in the commerce_jobs table and processes it with a runner. Jobs are durable: they survive server restarts and can be retried on failure.
If no runner is configured (jobs.autorun.enabled not set), jobs accumulate in pending status indefinitely. The server logs a warning at boot when autorun is disabled.
Task definition
Section titled “Task definition”A task is a named handler with typed input, output, retry policy, and optional concurrency control:
import type { TaskDefinition } from "@porulle/core";
const sendEmailTask: TaskDefinition< { to: string; template: string; data: Record<string, unknown> }, { messageId: string }> = { slug: "email:send", handler: async ({ input, ctx }) => { await ctx.services.email.send({ template: input.template, to: input.to, data: input.data, }); return { output: { messageId: "ok" } }; }, retries: { attempts: 3, backoff: { type: "exponential", delay: 5000 }, },};TaskDefinition fields
Section titled “TaskDefinition fields”| Field | Type | Required | Description |
|---|---|---|---|
slug | string | Yes | Unique identifier. Convention: domain:action (e.g., email:send, webhook:deliver) |
handler | (args) => Promise<{ output }> | Yes | Function that runs when the job is processed. Receives { input, ctx }. |
retries | { attempts, backoff? } | No | Retry policy. backoff.type is "fixed" or "exponential". backoff.delay is milliseconds. |
concurrency | { key, exclusive? } | No | Concurrency control. key(input) returns a string; only one job per key runs at a time. |
Handler context
Section titled “Handler context”| Field | Type | Description |
|---|---|---|
input | TInput | The job’s input payload (typed by the task definition) |
ctx.db | PluginDb | Database instance for direct queries |
ctx.logger | Logger | Structured logger (Pino) |
ctx.services | ServiceContainer | Kernel services (inventory, orders, catalog, etc.) |
Enqueueing jobs
Section titled “Enqueueing jobs”Jobs are enqueued via the jobs adapter on the service container or from HookContext.jobs:
// From a hook handlerawait context.jobs.enqueue("email:send", { to: "customer@example.com", template: "order-confirmation", data: { orderNumber: "ORD-001", total: "$99.00" },});
// With delayed executionawait context.jobs.enqueue("appointment:reminder", { bookingId: "abc-123",}, { waitUntil: new Date(Date.now() + 24 * 60 * 60 * 1000), queue: "reminders",});Enqueue options
Section titled “Enqueue options”| Option | Type | Default | Description |
|---|---|---|---|
queue | string | "default" | Queue name. Runners can process specific queues. |
waitUntil | Date | null | Earliest time the job can be picked up |
maxAttempts | number | 5 | Maximum retry attempts before marking as failed |
Job lifecycle
Section titled “Job lifecycle”pending ──→ processing ──→ succeeded │ ├──→ pending (retry with backoff) │ └──→ failed (max attempts reached)| Status | Meaning |
|---|---|
pending | Waiting to be claimed by a runner. Respects waitUntil if set. |
processing | Claimed by a runner, handler is executing |
succeeded | Handler returned successfully. Output stored. |
failed | Handler threw and max attempts reached. Error stored. |
Runner strategies
Section titled “Runner strategies”1. Built-in cron endpoint (serverless)
Section titled “1. Built-in cron endpoint (serverless)”UC exposes GET /api/jobs/run automatically. Point your cron service at it:
{ "crons": [ { "path": "/api/jobs/run", "schedule": "* * * * *" }, { "path": "/api/jobs/run?queue=reminders", "schedule": "*/5 * * * *" } ]}Query parameters:
| Param | Default | Description |
|---|---|---|
queue | "default" | Which queue to process |
limit | 10 | Max jobs per invocation |
In production, the endpoint requires *:* permission. Pass an API key via x-api-key header.
2. In-process polling (long-running servers)
Section titled “2. In-process polling (long-running servers)”export default defineConfig({ jobs: { autorun: { enabled: true, intervalMs: 10_000, }, tasks: [sendEmailTask, processOrderTask], },});The runner uses SELECT FOR UPDATE SKIP LOCKED, so multiple instances can poll the same queue safely without double-processing.
3. Custom worker process
Section titled “3. Custom worker process”import { runPendingJobs } from "@porulle/core";
async function loop() { while (true) { const { processed } = await runPendingJobs({ db: kernel.database.db, tasks: taskMap, queue: "default", limit: 20, logger: kernel.logger, services: kernel.services, });
if (processed === 0) { await new Promise((r) => setTimeout(r, 5000)); } }}
loop();Retry and backoff
Section titled “Retry and backoff”When a handler throws, the runner checks attempts < maxAttempts:
- Under limit — Job returns to
pendingwith awaitUntilcomputed from the backoff policy - At limit — Job marked
failedwith the error message stored
Attempt 1: immediateAttempt 2: wait 5s (exponential: 5000 × 2^0)Attempt 3: wait 10s (exponential: 5000 × 2^1)Attempt 4: wait 20s (exponential: 5000 × 2^2)Attempt 5: failedTo cancel a job without retrying, throw with error.cancel = true:
handler: async ({ input }) => { if (!isValidEmail(input.email)) { const err = new Error("Invalid email — cancelling job"); (err as Record<string, unknown>).cancel = true; throw err; }},Database schema
Section titled “Database schema”CREATE TABLE commerce_jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), organization_id TEXT NOT NULL, queue TEXT NOT NULL DEFAULT 'default', task_slug TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', input JSONB DEFAULT '{}', output JSONB, error TEXT, attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 5, wait_until TIMESTAMPTZ, concurrency_key TEXT, processing_started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW());Monitoring queries
Section titled “Monitoring queries”-- Pending jobs older than 1 hour (stuck)SELECT id, task_slug, created_at, wait_untilFROM commerce_jobsWHERE status = 'pending' AND created_at < NOW() - INTERVAL '1 hour';
-- Queue depth per queueSELECT queue, COUNT(*) AS pendingFROM commerce_jobs WHERE status = 'pending'GROUP BY queue;
-- Recent failuresSELECT id, task_slug, error, attempts, completed_atFROM commerce_jobs WHERE status = 'failed'ORDER BY completed_at DESC LIMIT 20;
-- Processing time per taskSELECT task_slug, COUNT(*) AS total, AVG(EXTRACT(EPOCH FROM (completed_at - processing_started_at))) AS avg_secondsFROM commerce_jobs WHERE status = 'succeeded'GROUP BY task_slug;Register tasks
Section titled “Register tasks”All tasks must be registered before jobs referencing them are processed. If a job’s task_slug has no matching task definition, the runner marks it as failed with “Unknown task slug”.
import { APPOINTMENT_EMAIL_TASKS } from "@porulle/plugin-appointments";
export default defineConfig({ jobs: { tasks: [ ...APPOINTMENT_EMAIL_TASKS, { slug: "analytics:daily-aggregate", handler: async ({ ctx }) => { // aggregate daily analytics return { output: { date: new Date().toISOString() } }; }, }, ], },});