• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Scale to 10M Users: CQRS in NestJS for API Performance

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
Scale to 10M Users: CQRS in NestJS for API Performance


In today's digital landscape, building applications that can handle millions of users requires thoughtful architecture decisions. Command Query Responsibility Segregation (CQRS) is one such pattern that helps maintain performance at scale. Let's explore!

What is CQRS and Why Use It?


CQRS splits your application into two models:

  • Command model: Handles create, update, and delete operations
  • Query model: Manages read operations

This separation addresses the reality that most applications have asymmetric read/write loads—typically with reads far outnumbering writes.

Setting Up CQRS in NestJS


First, install the required packages:


npm install @nestjs/cqrs uuid kafkajs redis mongodb pg
Directory Structure


src/
├── commands/
│ ├── handlers/
│ ├── impl/
├── queries/
│ ├── handlers/
│ ├── impl/
├── events/
├── models/
├── controllers/
└── app.module.ts
Basic Implementation


Let's start with our app.module.ts:


import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { MongooseModule } from '@nestjs/mongoose';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CommandHandlers } from './commands/handlers';
import { QueryHandlers } from './queries/handlers';
import { EventHandlers } from './events/handlers';
import { Controllers } from './controllers';

@Module({
imports: [
CqrsModule,
MongooseModule.forRoot('mongodb://localhost:27017/cqrs_read'),
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'password',
database: 'cqrs_write',
entities: [__dirname + '/**/*.entity{.ts,.js}'],
synchronize: true,
}),
],
controllers: [...Controllers],
providers: [
...CommandHandlers,
...QueryHandlers,
...EventHandlers,
],
})
export class AppModule {}

Now, let's define a command:


// commands/impl/create-user.command.ts
export class CreateUserCommand {
constructor(
public readonly email: string,
public readonly name: string,
) {}
}

And its handler:


// commands/handlers/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { CreateUserCommand } from '../impl/create-user.command';
import { User } from '../../models/user.entity';
import { UserCreatedEvent } from '../../events/impl/user-created.event';

@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(
@InjectRepository(User)
private userRepository: Repository<User>,
private eventBus: EventBus,
) {}

async execute(command: CreateUserCommand): Promise<User> {
const { email, name } = command;

const user = new User();
user.email = email;
user.name = name;

const savedUser = await this.userRepository.save(user);

// Publish event for read model synchronization
this.eventBus.publish(new UserCreatedEvent(savedUser.id, email, name));

return savedUser;
}
}
Query Side with MongoDB


For read operations, we'll use MongoDB for its superior query performance:


// queries/impl/get-user.query.ts
export class GetUserQuery {
constructor(public readonly id: string) {}
}

// queries/handlers/get-user.handler.ts
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { GetUserQuery } from '../impl/get-user.query';
import { UserReadModel } from '../../models/user.read-model';

@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
constructor(
@InjectModel(UserReadModel.name)
private userModel: Model<UserReadModel>,
) {}

async execute(query: GetUserQuery): Promise<UserReadModel> {
return this.userModel.findOne({ userId: query.id }).exec();
}
}
Synchronizing Read/Write Models with Kafka


To keep our read model updated, we'll use Kafka for event sourcing:


// events/handlers/user-created.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { UserCreatedEvent } from '../impl/user-created.event';

@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
constructor(
@Inject('KAFKA_SERVICE') private kafkaClient: ClientKafka
) {}

handle(event: UserCreatedEvent) {
this.kafkaClient.emit('user-created', {
id: event.id,
email: event.email,
name: event.name,
timestamp: new Date().toISOString()
});
}
}

And our consumer service:


// services/read-model-sync.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { Consumer, Kafka } from 'kafkajs';
import { UserReadModel } from '../models/user.read-model';

@Injectable()
export class ReadModelSyncService implements OnModuleInit {
private consumer: Consumer;

constructor(
@InjectModel(UserReadModel.name)
private userModel: Model<UserReadModel>,
) {
const kafka = new Kafka({
clientId: 'read-model-sync',
brokers: ['localhost:9092'],
});
this.consumer = kafka.consumer({ groupId: 'read-model-sync-group' });
}

async onModuleInit() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'user-created', fromBeginning: true });

await this.consumer.run({
eachMessage: async ({ topic, message }) => {
const eventData = JSON.parse(message.value.toString());

if (topic === 'user-created') {
await this.userModel.updateOne(
{ userId: eventData.id },
{
userId: eventData.id,
email: eventData.email,
name: eventData.name,
updatedAt: new Date()
},
{ upsert: true }
);
}
},
});
}
}

To further enhance read performance, we can add Redis Caching for Read Queries.


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу