Skip to content

Add a BullMQ Queue

Canonical example: bet settled queue at apps/api/src/bet/queue/.

1. Define the queue name

Create a constants file in your feature directory:

// apps/api/src/your-feature/const.ts
export const YOUR_FEATURE_QUEUE = 'your_feature_queue';

Existing queues follow snake_case naming: bet_settled_queue (apps/api/src/bet/queue/const.ts:1), leaderboard_queue (apps/api/src/leaderboard/const.ts:14), updateSessionQueue (apps/api/src/auth/session/dto/sessions.dto.ts:50).

2. Create the producer

Inject the BullMQ Queue via @InjectQueue. Follow bet.queue-producer.ts:12-54:

// apps/api/src/your-feature/your-feature.queue-producer.ts

import { YOUR_FEATURE_QUEUE } from './const';
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue, JobsOptions } from 'bullmq';

export interface YourJobData {
  entityId: string;
  action: string;
}

@Injectable()
export class YourFeatureQueueProducer {
  constructor(
    @InjectQueue(YOUR_FEATURE_QUEUE) private readonly queue: Queue,
  ) {}

  async enqueue(data: YourJobData): Promise<void> {
    await this.queue.add(data.entityId, data, {
      jobId: `your-feature-${data.entityId}`,
      attempts: 10,
      backoff: { type: 'exponential', delay: 500 },
    });
  }
}

Job options used across the codebase (bet.queue-producer.ts:20-28): - jobId — deterministic ID prevents duplicate jobs for the same entity. - attempts: 10 — retries with exponential backoff (default from bullmq.const.ts:5). - backoff: { type: 'exponential', delay: 500 } — 500ms → 1s → 2s → 4s → ...

For request-context propagation (OTel trace IDs, user context), use the ContextQueue wrapper instead of raw Queue — see bet.queue-producer.ts:14,17,32-36.

3. Create the processor (consumer)

Extend WorkerHost and decorate with @Processor. Follow bet.queue-processor.ts:21-30:

// apps/api/src/your-feature/your-feature.queue-processor.ts

import { YOUR_FEATURE_QUEUE } from './const';
import { OnWorkerEvent, Processor } from '@nestjs/bullmq';
import { WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { EvoLogger } from '@bebkovan/server-core';

@Processor(YOUR_FEATURE_QUEUE, {
  concurrency: 3,
  stalledInterval: 15000,
  drainDelay: 5,
  removeOnComplete: { age: 300 },
})
export class YourFeatureQueueProcessor extends WorkerHost {
  constructor(private readonly yourService: YourFeatureService) {
    super();
  }

  async process(job: Job<YourJobData>): Promise<void> {
    const { entityId, action } = job.data;
    await this.yourService.handleQueuedAction(entityId, action);
  }

  @OnWorkerEvent('error')
  onError(error: Error) {
    EvoLogger.error(YourFeatureQueueProcessor, 'Job error', error);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job<YourJobData>, error: unknown) {
    EvoLogger.error(
      YourFeatureQueueProcessor,
      `Job ${job.id} failed after ${job.attemptsMade} attempts`,
      error,
    );
  }
}

Processor options from bet.queue-processor.ts:21-29: - concurrency: 3 — max parallel jobs per worker instance. - stalledInterval: 15000 — detect stuck jobs every 15s. - removeOnComplete: { age: 300 } — auto-purge completed jobs after 5 min.

For context-aware processing (inherits HTTP request context from the producer), extend ContextWorkerHost instead and override processJob() — see bet.queue-processor.ts:30,81.

4. Create the module

Register the queue in a producer module and a processor module. Follow bet-queue-producer.module.ts:7-13:

// apps/api/src/your-feature/your-feature-queue.module.ts

import { YOUR_FEATURE_QUEUE } from './const';
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';

@Module({
  imports: [BullModule.registerQueue({ name: YOUR_FEATURE_QUEUE })],
  providers: [YourFeatureQueueProducer, YourFeatureQueueProcessor],
  exports: [YourFeatureQueueProducer],
})
export class YourFeatureQueueModule {}

Then import YourFeatureQueueModule wherever the producer is needed.

The root BullModule.forRoot() is already configured in apps/api/src/app.module.ts:137-143 pointing at the cache Redis instance (REDIS_HOST:REDIS_PORT). No additional Redis setup needed.

5. Job data serialization

For jobs carrying Decimal fields (Prisma), create a serializer to convert to/from strings. Follow bet.serializer.ts:

export class YourJobData {
  entityId: string;
  serializedPayload: string;

  static serialize(entity: YourEntity): YourJobData {
    return {
      entityId: entity.id,
      serializedPayload: JSON.stringify({
        ...entity,
        amount: entity.amount.toString(),
      }),
    };
  }

  static deserialize(data: YourJobData): YourEntity {
    const parsed = JSON.parse(data.serializedPayload);
    return { ...parsed, amount: new Decimal(parsed.amount) };
  }
}

BullMQ serializes job data as JSON — Decimal objects from Prisma lose precision without explicit string conversion.

6. Inspect jobs via redis-cli

All BullMQ queues use the cache Redis on port 6379:

redis-cli -a cache
KEYS "bull:*:id"                              # list all queues
ZCARD bull:your_feature_queue:wait            # waiting jobs
ZCARD bull:your_feature_queue:failed          # failed jobs
HGETALL bull:your_feature_queue:<jobId>       # inspect one job

You're done — test by...

cd ebit-api
npm run start:dev
# Trigger the code path that calls your producer
# Then inspect Redis:
redis-cli -a cache KEYS "bull:your_feature_queue:*"
redis-cli -a cache ZCARD bull:your_feature_queue:completed