Skip to content

Job Queue

The job queue stores background work in the commerce_jobs table and processes it with a runner. Jobs are durable: they survive server restarts and can be retried on failure.

If no runner is configured (jobs.autorun.enabled not set), jobs accumulate in pending status indefinitely. The server logs a warning at boot when autorun is disabled.


A task is a named handler with typed input, output, retry policy, and optional concurrency control:

import type { TaskDefinition } from "@porulle/core";
const sendEmailTask: TaskDefinition<
{ to: string; template: string; data: Record<string, unknown> },
{ messageId: string }
> = {
slug: "email:send",
handler: async ({ input, ctx }) => {
await ctx.services.email.send({
template: input.template,
to: input.to,
data: input.data,
});
return { output: { messageId: "ok" } };
},
retries: {
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
},
};
FieldTypeRequiredDescription
slugstringYesUnique identifier. Convention: domain:action (e.g., email:send, webhook:deliver)
handler(args) => Promise<{ output }>YesFunction that runs when the job is processed. Receives { input, ctx }.
retries{ attempts, backoff? }NoRetry policy. backoff.type is "fixed" or "exponential". backoff.delay is milliseconds.
concurrency{ key, exclusive? }NoConcurrency control. key(input) returns a string; only one job per key runs at a time.
FieldTypeDescription
inputTInputThe job’s input payload (typed by the task definition)
ctx.dbPluginDbDatabase instance for direct queries
ctx.loggerLoggerStructured logger (Pino)
ctx.servicesServiceContainerKernel services (inventory, orders, catalog, etc.)

Jobs are enqueued via the jobs adapter on the service container or from HookContext.jobs:

// From a hook handler
await context.jobs.enqueue("email:send", {
to: "customer@example.com",
template: "order-confirmation",
data: { orderNumber: "ORD-001", total: "$99.00" },
});
// With delayed execution
await context.jobs.enqueue("appointment:reminder", {
bookingId: "abc-123",
}, {
waitUntil: new Date(Date.now() + 24 * 60 * 60 * 1000),
queue: "reminders",
});
OptionTypeDefaultDescription
queuestring"default"Queue name. Runners can process specific queues.
waitUntilDatenullEarliest time the job can be picked up
maxAttemptsnumber5Maximum retry attempts before marking as failed

pending ──→ processing ──→ succeeded
├──→ pending (retry with backoff)
└──→ failed (max attempts reached)
StatusMeaning
pendingWaiting to be claimed by a runner. Respects waitUntil if set.
processingClaimed by a runner, handler is executing
succeededHandler returned successfully. Output stored.
failedHandler threw and max attempts reached. Error stored.

UC exposes GET /api/jobs/run automatically. Point your cron service at it:

vercel.json
{
"crons": [
{ "path": "/api/jobs/run", "schedule": "* * * * *" },
{ "path": "/api/jobs/run?queue=reminders", "schedule": "*/5 * * * *" }
]
}

Query parameters:

ParamDefaultDescription
queue"default"Which queue to process
limit10Max jobs per invocation

In production, the endpoint requires *:* permission. Pass an API key via x-api-key header.

2. In-process polling (long-running servers)

Section titled “2. In-process polling (long-running servers)”
commerce.config.ts
export default defineConfig({
jobs: {
autorun: {
enabled: true,
intervalMs: 10_000,
},
tasks: [sendEmailTask, processOrderTask],
},
});

The runner uses SELECT FOR UPDATE SKIP LOCKED, so multiple instances can poll the same queue safely without double-processing.

worker.ts
import { runPendingJobs } from "@porulle/core";
async function loop() {
while (true) {
const { processed } = await runPendingJobs({
db: kernel.database.db,
tasks: taskMap,
queue: "default",
limit: 20,
logger: kernel.logger,
services: kernel.services,
});
if (processed === 0) {
await new Promise((r) => setTimeout(r, 5000));
}
}
}
loop();

When a handler throws, the runner checks attempts < maxAttempts:

  • Under limit — Job returns to pending with a waitUntil computed from the backoff policy
  • At limit — Job marked failed with the error message stored
Attempt 1: immediate
Attempt 2: wait 5s (exponential: 5000 × 2^0)
Attempt 3: wait 10s (exponential: 5000 × 2^1)
Attempt 4: wait 20s (exponential: 5000 × 2^2)
Attempt 5: failed

To cancel a job without retrying, throw with error.cancel = true:

handler: async ({ input }) => {
if (!isValidEmail(input.email)) {
const err = new Error("Invalid email — cancelling job");
(err as Record<string, unknown>).cancel = true;
throw err;
}
},

CREATE TABLE commerce_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
organization_id TEXT NOT NULL,
queue TEXT NOT NULL DEFAULT 'default',
task_slug TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
input JSONB DEFAULT '{}',
output JSONB,
error TEXT,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
wait_until TIMESTAMPTZ,
concurrency_key TEXT,
processing_started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Pending jobs older than 1 hour (stuck)
SELECT id, task_slug, created_at, wait_until
FROM commerce_jobs
WHERE status = 'pending' AND created_at < NOW() - INTERVAL '1 hour';
-- Queue depth per queue
SELECT queue, COUNT(*) AS pending
FROM commerce_jobs WHERE status = 'pending'
GROUP BY queue;
-- Recent failures
SELECT id, task_slug, error, attempts, completed_at
FROM commerce_jobs WHERE status = 'failed'
ORDER BY completed_at DESC LIMIT 20;
-- Processing time per task
SELECT task_slug,
COUNT(*) AS total,
AVG(EXTRACT(EPOCH FROM (completed_at - processing_started_at))) AS avg_seconds
FROM commerce_jobs WHERE status = 'succeeded'
GROUP BY task_slug;

All tasks must be registered before jobs referencing them are processed. If a job’s task_slug has no matching task definition, the runner marks it as failed with “Unknown task slug”.

commerce.config.ts
import { APPOINTMENT_EMAIL_TASKS } from "@porulle/plugin-appointments";
export default defineConfig({
jobs: {
tasks: [
...APPOINTMENT_EMAIL_TASKS,
{
slug: "analytics:daily-aggregate",
handler: async ({ ctx }) => {
// aggregate daily analytics
return { output: { date: new Date().toISOString() } };
},
},
],
},
});