Add a BullMQ Queue¶
Canonical example: bet settled queue at apps/api/src/bet/queue/.
1. Define the queue name¶
Create a constants file in your feature directory:
Existing queues follow snake_case naming: bet_settled_queue (apps/api/src/bet/queue/const.ts:1),
leaderboard_queue (apps/api/src/leaderboard/const.ts:14),
updateSessionQueue (apps/api/src/auth/session/dto/sessions.dto.ts:50).
2. Create the producer¶
Inject the BullMQ Queue via @InjectQueue. Follow bet.queue-producer.ts:12-54:
// apps/api/src/your-feature/your-feature.queue-producer.ts
import { YOUR_FEATURE_QUEUE } from './const';
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue, JobsOptions } from 'bullmq';
export interface YourJobData {
entityId: string;
action: string;
}
@Injectable()
export class YourFeatureQueueProducer {
constructor(
@InjectQueue(YOUR_FEATURE_QUEUE) private readonly queue: Queue,
) {}
async enqueue(data: YourJobData): Promise<void> {
await this.queue.add(data.entityId, data, {
jobId: `your-feature-${data.entityId}`,
attempts: 10,
backoff: { type: 'exponential', delay: 500 },
});
}
}
Job options used across the codebase (bet.queue-producer.ts:20-28):
- jobId — deterministic ID prevents duplicate jobs for the same entity.
- attempts: 10 — retries with exponential backoff (default from bullmq.const.ts:5).
- backoff: { type: 'exponential', delay: 500 } — 500ms → 1s → 2s → 4s → ...
For request-context propagation (OTel trace IDs, user context), use the ContextQueue wrapper
instead of raw Queue — see bet.queue-producer.ts:14,17,32-36.
3. Create the processor (consumer)¶
Extend WorkerHost and decorate with @Processor. Follow bet.queue-processor.ts:21-30:
// apps/api/src/your-feature/your-feature.queue-processor.ts
import { YOUR_FEATURE_QUEUE } from './const';
import { OnWorkerEvent, Processor } from '@nestjs/bullmq';
import { WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { EvoLogger } from '@bebkovan/server-core';
@Processor(YOUR_FEATURE_QUEUE, {
concurrency: 3,
stalledInterval: 15000,
drainDelay: 5,
removeOnComplete: { age: 300 },
})
export class YourFeatureQueueProcessor extends WorkerHost {
constructor(private readonly yourService: YourFeatureService) {
super();
}
async process(job: Job<YourJobData>): Promise<void> {
const { entityId, action } = job.data;
await this.yourService.handleQueuedAction(entityId, action);
}
@OnWorkerEvent('error')
onError(error: Error) {
EvoLogger.error(YourFeatureQueueProcessor, 'Job error', error);
}
@OnWorkerEvent('failed')
onFailed(job: Job<YourJobData>, error: unknown) {
EvoLogger.error(
YourFeatureQueueProcessor,
`Job ${job.id} failed after ${job.attemptsMade} attempts`,
error,
);
}
}
Processor options from bet.queue-processor.ts:21-29:
- concurrency: 3 — max parallel jobs per worker instance.
- stalledInterval: 15000 — detect stuck jobs every 15s.
- removeOnComplete: { age: 300 } — auto-purge completed jobs after 5 min.
For context-aware processing (inherits HTTP request context from the producer), extend
ContextWorkerHost instead and override processJob() — see bet.queue-processor.ts:30,81.
4. Create the module¶
Register the queue in a producer module and a processor module.
Follow bet-queue-producer.module.ts:7-13:
// apps/api/src/your-feature/your-feature-queue.module.ts
import { YOUR_FEATURE_QUEUE } from './const';
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
@Module({
imports: [BullModule.registerQueue({ name: YOUR_FEATURE_QUEUE })],
providers: [YourFeatureQueueProducer, YourFeatureQueueProcessor],
exports: [YourFeatureQueueProducer],
})
export class YourFeatureQueueModule {}
Then import YourFeatureQueueModule wherever the producer is needed.
The root BullModule.forRoot() is already configured in apps/api/src/app.module.ts:137-143
pointing at the cache Redis instance (REDIS_HOST:REDIS_PORT). No additional Redis setup needed.
5. Job data serialization¶
For jobs carrying Decimal fields (Prisma), create a serializer to convert to/from strings.
Follow bet.serializer.ts:
export class YourJobData {
entityId: string;
serializedPayload: string;
static serialize(entity: YourEntity): YourJobData {
return {
entityId: entity.id,
serializedPayload: JSON.stringify({
...entity,
amount: entity.amount.toString(),
}),
};
}
static deserialize(data: YourJobData): YourEntity {
const parsed = JSON.parse(data.serializedPayload);
return { ...parsed, amount: new Decimal(parsed.amount) };
}
}
BullMQ serializes job data as JSON — Decimal objects from Prisma lose precision without
explicit string conversion.
6. Inspect jobs via redis-cli¶
All BullMQ queues use the cache Redis on port 6379:
redis-cli -a cache
KEYS "bull:*:id" # list all queues
ZCARD bull:your_feature_queue:wait # waiting jobs
ZCARD bull:your_feature_queue:failed # failed jobs
HGETALL bull:your_feature_queue:<jobId> # inspect one job