728x90
NestJS 는 다양한 마이크로서비스 기능을 제공하고 있는데, 그 중에 카프카 역시 존재한다.
카프카란 무엇인가: https://yonikim.tistory.com/143
1. 설치
$ npm i @nestjs/microservices kafkajs
2. 카프카 세팅
▷ main.ts
import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '/src/app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const configService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: configService.get("KAFKA_BROKERS").split(","),
},
},
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
3. Consumer
컨슈머의 경우 Controller 에서 @MessagePattern 데코레이터 사용만으로 손쉽게 구현할 수 있다.
▷ consumer.controller.ts
import { Controller } from '@nestjs/common';
import { Ctx, KafkaContext, MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class ConsumerController {
@MessagePattern('TOPIC_NAME')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const response = originalMessage.value;
console.log(originalMessage.value);
console.log(message);
// 메시지 이외 context 정보
console.log(context.getTopic());
console.log(context.getArgs());
console.log(context.getPartition());
return response;
}
}
4. Producer
프로듀서의 경우 Module 에서 Client 설정을 해준 후 Service 에서 inject 해줘야 한다.
▷ kafka.module.ts
import { Module } from "@nestjs/common";
import { MongooseModule } from "@nestjs/mongoose";
import { CacheModule } from "@nestjs/cache-manager";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { ClientsModule, Transport } from "@nestjs/microservices";
@Module({
imports: [
ClientsModule.registerAsync({
isGlobal: false,
clients: [
{
inject: [ConfigService],
name: "MY_KAFKA", // injection 시 사용할 name
useFactory: async (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: configService.get("KAFKA_CLIENT_ID"),
brokers: configService.get("KAFKA_BROKERS").split(","),
},
// producer 만 쓸 경우엔 consumer 설정 불필요
consumer: {
groupId: configService.get("KAFKA_CONSUMER_GROUP_ID"),
sessionTimeout: configService.get("KAFKA_SESSION_TIMEOUT"),
rebalanceTimeout: configService.get("KAFKA_REBALANCE_TIMEOUT"),
},
},
}),
},
],
}),
],
controllers: [],
providers: [ProducerService],
exports: [],
})
export class KafkaModule {}
▷ ProducerService
@Injectable()
export class ProducerService {
constructor(@Inject('MY_KAFKA') private readonly kafkaClient: ClientKafka) {}
sendMessage(message) {
try {
this.kafkaClient.emit('TOPIC_NAME', message);
} catch (err) {
console.log(err);
}
}
}
테스트를 해보니, 서비스가 재배포되는 동안에 전송된 메시지는 못 가져오는 이슈가 있었다.
맞는지 모르겠지만 아래와 같이 카프카 세팅에서 옵션을 추가해주면, commit 되지 않은 메시지는 모두 읽어온다.
import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '/src/app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const configService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
readUncommitted: true,
brokers: configService.get("KAFKA_BROKERS").split(","),
},
subscribe: {
fromBeginning: true,
},
},
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
728x90
'TypeScript' 카테고리의 다른 글
[Nest.js] swagger ApiResponse 에서 generic dto 사용하기 (0) | 2024.11.14 |
---|---|
[TypeScript] as const (0) | 2024.10.16 |
[Nest.js] DataDog 를 이용하여 trace id, span id 심기 (0) | 2023.10.17 |
[Nest.js] 버전 별로 스웨거 관리 (0) | 2023.01.11 |
[TypeORM] 데코레이터 - Entity (1) | 2022.09.19 |