owl1n/nest-queue
Queue manager for NestJS Framework for Redis (via bull package)
Lightweight queue module for NestJS applications with bull and bullmq drivers.
>=20>=11pnpm add nest-queue bull
# or
pnpm add nest-queue bullmq
import { Module } from "@nestjs/common";
import { QueueModule } from "nest-queue";
@Module({
imports: [
QueueModule.forRoot({
connection: {
redis: {
host: "127.0.0.1",
port: 6379
}
}
})
]
})
export class AppModule {}
import { Controller, Post } from "@nestjs/common";
import { Queue } from "bull";
import { QueueInjection } from "nest-queue";
@Controller("jobs")
export class JobsController {
constructor(@QueueInjection() private readonly queue: Queue) {}
@Post("send")
async send() {
await this.queue.add("mail.send", { userId: 1 });
return { status: "queued" };
}
}
import { Injectable } from "@nestjs/common";
import { DoneCallback, Job } from "bull";
import { EventConsumer } from "nest-queue";
@Injectable()
export class MailConsumer {
@EventConsumer("mail.send")
async handle(job: Job, done: DoneCallback) {
// process job.data
done();
}
}
EventConsumer supports policy options with backward compatibility:
@EventConsumer("payments.retry", {
queueName: "payments",
attempts: 5,
backoff: { type: "fixed", delay: 1000 },
concurrency: 3
})
async handlePayment(job: Job, done: DoneCallback) {
done();
}
Supported options:
queueNameconcurrencyattemptsbackoffremoveOnCompleteremoveOnFailQueueModule.forRoot([
{
name: "default",
connection: { redis: { host: "127.0.0.1", port: 6379 } }
},
{
name: "emails",
connection: { redis: { host: "127.0.0.1", port: 6380 } }
}
]);
No download data available
No tracked packages depend on this.
constructor(@QueueInjection("emails") private readonly emailQueue: Queue) {}
@EventConsumer("mail.send", "emails")
handleEmail(job: Job, done: DoneCallback) {
done();
}
QueueModule.forRoot({
name: "events",
driver: "bullmq",
connection: {
host: "127.0.0.1",
port: 6379
}
});
import { Queue } from "bullmq";
constructor(@QueueInjection("events") private readonly queue: Queue) {}
@EventConsumer("mail.send", "events")
async handle(job: { data: unknown }) {
// process BullMQ job
}
QueueModule.forRootAsync({
useFactory: async (config: ConfigService) => ({
connection: {
redis: {
host: config.get("REDIS_HOST"),
port: Number(config.get("REDIS_PORT"))
}
}
}),
inject: [ConfigService]
});
forRootAsyncregisters the default queue token (@QueueInjection()).
QueueModule.forRoot(options | options[])QueueModule.forRootAsync(asyncOptions)QueueInjection(name?)EventConsumer(eventName, queueNameOrOptions?)QueueRegistryService.enqueue(eventName, data, options?)QueueRegistryService.getHealthSnapshot()import { Injectable } from "@nestjs/common";
import { QueueRegistryService } from "nest-queue";
@Injectable()
export class QueueFacade {
constructor(private readonly queues: QueueRegistryService) {}
async publish() {
await this.queues.enqueue("mail.send", { userId: 42 }, { queueName: "events" });
}
async health() {
return this.queues.getHealthSnapshot();
}
}
pnpm install
pnpm run build
pnpm test
Repository includes 3 GitHub Actions workflows:
CI (.github/workflows/ci.yml)
master/feature branches.pnpm lint, pnpm run build, pnpm test.Release Please (.github/workflows/release-please.yml)
master.vX.Y.Z) and GitHub Release.Publish to npm (.github/workflows/publish.yml)
NPM_TOKEN — npm automation token with publish rights for nest-queue.Use conventional commit types so release notes and versioning are meaningful:
feat: for new features (minor bump)fix: for bug fixes (patch bump)feat!: or BREAKING CHANGE: in body for major bumpdocs:, chore:, refactor: for non-feature updatesCurrent package is intentionally minimal. The most requested next steps for queue modules in service ecosystems are:
BullMQ adapter and compatibility mode for migration from bull ✅OpenTelemetry traces, metrics, queue health probes)pause/resume/drain, dead-letter flow, replay failed jobs)