본문 바로가기
TypeScript

[Nest.js] 카프카(Kafka) 세팅하기

by yonikim 2023. 12. 7.
728x90

 

 

NestJS 는 다양한 마이크로서비스 기능을 제공하고 있는데, 그 중에 카프카 역시 존재한다.

카프카란 무엇인가: https://yonikim.tistory.com/143

 

1. 설치

$ npm i @nestjs/microservices kafkajs

 

2. 카프카 세팅

▷ main.ts

import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '/src/app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const configService = app.get(ConfigService);
  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: configService.get("KAFKA_BROKERS").split(","),
      },
    },
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

 

 

3. Consumer 

컨슈머의 경우 Controller 에서 @MessagePattern 데코레이터 사용만으로 손쉽게 구현할 수 있다.

 

▷ consumer.controller.ts

import { Controller } from '@nestjs/common';
import { Ctx, KafkaContext, MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class ConsumerController {
  @MessagePattern('TOPIC_NAME')
  readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    const originalMessage = context.getMessage();
    const response = originalMessage.value;
    
    console.log(originalMessage.value);
    console.log(message);
    
    // 메시지 이외 context 정보
    console.log(context.getTopic());
    console.log(context.getArgs());
    console.log(context.getPartition());

    return response;
  }
}

 

 

4. Producer

프로듀서의 경우 Module 에서 Client 설정을 해준 후 Service 에서 inject 해줘야 한다.

 

▷ kafka.module.ts

import { Module } from "@nestjs/common";
import { MongooseModule } from "@nestjs/mongoose";
import { CacheModule } from "@nestjs/cache-manager";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { ClientsModule, Transport } from "@nestjs/microservices";

@Module({
  imports: [
    ClientsModule.registerAsync({
      isGlobal: false,
      clients: [
        {
          inject: [ConfigService],
          name: "MY_KAFKA", // injection 시 사용할 name
          useFactory: async (configService: ConfigService) => ({
            transport: Transport.KAFKA,
            options: {
              client: {
                clientId: configService.get("KAFKA_CLIENT_ID"),
                brokers: configService.get("KAFKA_BROKERS").split(","),
              },
              // producer 만 쓸 경우엔 consumer 설정 불필요
              consumer: {
                groupId: configService.get("KAFKA_CONSUMER_GROUP_ID"),
                sessionTimeout: configService.get("KAFKA_SESSION_TIMEOUT"),
                rebalanceTimeout: configService.get("KAFKA_REBALANCE_TIMEOUT"),
              },
            },
          }),
        },
      ],
    }),
  ],
  controllers: [],
  providers: [ProducerService],
  exports: [],
})
export class KafkaModule {}

 

▷ ProducerService

@Injectable()
export class ProducerService {
  constructor(@Inject('MY_KAFKA') private readonly kafkaClient: ClientKafka) {}

  sendMessage(message) {
    try {  
    	this.kafkaClient.emit('TOPIC_NAME', message);
    } catch (err) {
    	console.log(err);
    }
  }
}

 

 


 

테스트를 해보니, 서비스가 재배포되는 동안에 전송된 메시지는 못 가져오는 이슈가 있었다. 

맞는지 모르겠지만 아래와 같이 카프카 세팅에서 옵션을 추가해주면, commit 되지 않은 메시지는 모두 읽어온다. 

import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { AppModule } from '/src/app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const configService = app.get(ConfigService);
  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        readUncommitted: true,
        brokers: configService.get("KAFKA_BROKERS").split(","),
      },
      subscribe: {
        fromBeginning: true,
      },
    },
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();
728x90