Handling CPU-intensive tasks with queues
Handling CPU-intensive operations with REST API can be tricky. If our endpoint takes too much time to respond, it might result in a timeout. In this article,…
Handling CPU-intensive operations with REST API can be tricky. If our endpoint takes too much time to respond, it might result in a timeout. In this article, we look into queues to help us resolve this issue. Queue proves to be a very useful part of backend architecture. With it, we can implement asynchronous and distributed processing. A queue is a data structure that is modeled on a real-world queue. A publisher can post messages to the queue. A consumer can consume the message and process it. Once the consumer handles the message, no other consumer can process this message. With NestJS, we have access to the @nestjs/bull package. It wraps the Bull library that provides queue functionalities based on Redis. Redis is a fast and reliable key-value store that keeps data in its memory. Even if we restart our Node.js application, we don’t lose the data saved in Redis. Setting up Bull and Redis Since Bull uses Redis to manage queues, we need to set it up. So far, within this series, we’ve used Docker Compose to help us with our architecture. Thankfully, it is straightforward to set up Redis with Docker. docker-compose.yml version: "3" services: redis: image: "redis:alpine" ports: - "6379:6379" # ... By default, Redis works on port 6379 Connecting to Redis requires us to define two additional environment variables: the port and the host. app.module.ts import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import * as Joi from '@hapi/joi'; @Module({ imports: [ ConfigModule.forRoot({ validationSchema: Joi.object({ REDIS_HOST: Joi.string().required(), REDIS_PORT: Joi.number().required(), // ... }) }), // ... ], controllers: [], providers: [], }) export class AppModule {} .env REDIS_HOST=localhost REDIS_PORT=6379 # ...We also need to install the necessary dependencies.npm install @nestjs/bull @types/bull bullOnce we’ve got all of the above configured, we can establish a connection with Redis. app.module.ts import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.forRootAsync({ imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('REDIS_HOST'), port: Number(configService.get('REDIS_PORT')), }, }), inject: [ConfigService], }), // ... ], controllers: [], providers: [], }) export class AppModule {}Thanks to calling BullModule.forRootAsync, we can use Bull across all of our modules. We can pass more options besides the redis object when configuring Bull. For a whole list check out the documentation. Managing queues with Bull Let’s create a queue that can help optimize multiple PNG images for us. We will start with defining a module. optimize.module.ts import { Module } from '@nestjs/common'; import { OptimizeController } from './optimize.controller'; import { BullModule } from '@nestjs/bull'; import { ImageProcessor } from './image.processor'; @Module({ imports: [ BullModule.registerQueue({ name: 'image', }) ], providers: [ImageProcessor], exports: [], controllers: [OptimizeController] }) export class OptimizeModule {}Above, we register our queue using BullModule.registerQueue. Thanks to doing so, we can use it in our OptimizeController. optimize.controller.ts import { Controller, Post, UploadedFiles, UseInterceptors, } from '@nestjs/common'; import { AnyFilesInterceptor } from '@nestjs/platform-express'; import { Express } from 'express'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; @Controller('optimize') export class OptimizeController { constructor( @InjectQueue('image') private readonly imageQueue: Queue, ) {} @Post('image') @UseInterceptors(AnyFilesInterceptor()) async processImage(@UploadedFiles() files: Express.Multer.File[]) { const job = await this.imageQueue.add('optimize', { files }); return { jobId: job.id } } }Above, we follow the NestJS documentation on how to upload multiple files with Multer. To do that, we need the AnyFilesInterceptor and the @UploadedFiles() decorator. Once we have the files, we need to add a job to our queue using the add() method. We pass two arguments to it: the name of the job that we later refer to and the data it needs. In the above endpoint, we respond with the id of the job. This will allow the user to ask for the return value of the job later. Consuming the queue Now we need to define a consumer. With it, we can process jobs added to the queue. To optimize images, we use the imagemin library. Since we expect the user to upload multiple images, we compress the result to a .zip file using the adm-zip package.npm install imagemin @types/imagemin imagemin-pngquant adm-zip @types/adm-zip optimize.processor.ts import { Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import * as AdmZip from 'adm-zip'; import { buffer } from 'imagemin'; import imageminPngquant from 'imagemin-pngquant'; import { Express } from 'express'; @Processor('image') export class ImageProcessor { @Process('optimize') async handleOptimization(job: Job) { const files: Express.Multer.File[] = job.data.files; const optimizationPromises: Promise