Message brokers play a crucial role in distributed systems by enabling communication between different components and services. Apache Kafka, a popular distributed streaming platform, can be seamlessly integrated with NestJS, a powerful Node.js framework, to build scalable and event-driven applications. In this blog post, we will explore how to integrate Kafka as a message broker with NestJS using a practical example.

Prerequisites Link to heading

Before proceeding, ensure you have the following prerequisites:

  • Node.js and npm installed on your local machine.
  • Kafka installed and running locally. You can download Kafka from the official Apache Kafka website and follow the installation instructions for your operating system.

Step 1: Set Up a Kafka Cluster Locally Link to heading

To get started, we need to set up a local Kafka cluster. Follow these steps:

  • Start ZooKeeper: Kafka relies on ZooKeeper for maintaining cluster metadata. Open a terminal and run the following command to start ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start Kafka brokers: Open a new terminal window and run the following command to start a Kafka broker:
bin/kafka-server-start.sh config/server.properties

You can start multiple brokers if you want to simulate a multi-node Kafka cluster.

  • Create a Kafka topic: Open another terminal window and run the following command to create a Kafka topic named test-topic:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

This will create a topic with one partition and a replication factor of 1.

Now that our local Kafka cluster is set up, we can move on to integrating Kafka with NestJS.

Step 2: Install Dependencies Link to heading

To integrate Kafka with NestJS, we need to install the kafkajs package, which is a modern Apache Kafka client for Node.js. Open a terminal window and navigate to your NestJS project directory. Run the following command to install the required dependencies:

npm install kafka-node

Step 3: Configure Kafka Producer and Consumer Link to heading

In this example, we will create a simple Kafka producer and consumer using NestJS. Create a kafka.producer.ts file and add the following code:

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

export async function connectProducer(): Promise<void> {
  await producer.connect();
}

export async function produceMessage(
  message: string,
  topic: string
): Promise<void> {
  await producer.send({
    topic,
    messages: [{ value: message }],
  });
}

In this code snippet, we create a Kafka producer instance using the kafkajs package. We provide the broker address and a client ID. The connectProducer function establishes a connection to the Kafka broker, and the produceMessage function sends a message to the specified topic.

Create a kafka.consumer.ts file and add the following code:

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "my-group" });

export async function connectConsumer(): Promise<void> {
  await consumer.connect();
}

export async function consumeMessages(topic: string): Promise<void> {
  await consumer.subscribe({ topic });
  await consumer.run({
    eachMessage: async ({ message }) => {
      console.log(`Received message: ${message.value}`);
    },
  });
}

In this code snippet, we create a Kafka consumer instance using the kafkajs package. We provide the broker address, a client ID, and a consumer group ID. The connectConsumer function establishes a connection to the Kafka broker, and the consumeMessages function subscribes to the specified topic and logs the received messages.

Step 4: Use Kafka in NestJS Link to heading

Now that we have our Kafka producer and consumer configured, we can integrate them into our NestJS application. In your desired NestJS module, import the connectProducer and produceMessage functions from the kafka.producer.ts file. Update the module to use the producer:

import { Module, OnModuleInit } from "@nestjs/common";
import { connectProducer, produceMessage } from "./kafka.producer";

@Module({
  imports: [],
  controllers: [],
  providers: [],
})
export class AppModule implements OnModuleInit {
  async onModuleInit(): Promise<void> {
    await connectProducer();
    await produceMessage("Hello, Kafka!", "test-topic");
  }
}

In this code snippet, we import the necessary functions from the Kafka producer file. The onModuleInit method is executed when the module is initialized. Inside this method, we establish a connection to the Kafka broker and send a test message to the test-topic topic.

In your desired NestJS module, import the connectConsumer and consumeMessages functions from the kafka.consumer.ts file. Update the module to use the consumer:

import { Module, OnModuleInit } from "@nestjs/common";
import { connectConsumer, consumeMessages } from "./kafka.consumer";

@Module({
  imports: [],
  controllers: [],
  providers: [],
})
export class AppModule implements OnModuleInit {
  async onModuleInit(): Promise<void> {
    await connectConsumer();
    await consumeMessages("test-topic");
  }
}

In this code snippet, we import the necessary functions from the Kafka consumer file. The onModuleInit method is executed when the module is initialized. Inside this method, we establish a connection to the Kafka broker and start consuming messages from the test-topic topic.

Conclusion Link to heading

In this blog post, we explored how to integrate Kafka as a message broker with NestJS. We set up a local Kafka cluster, installed the necessary dependencies, configured the Kafka producer and consumer, and integrated them into a NestJS application. This integration enables seamless communication between services using Kafka’s publish-subscribe model.

Kafka’s scalability, fault tolerance, and reliability make it an excellent choice for building distributed systems. By leveraging Kafka with NestJS, you can develop robust and event-driven applications that can handle large-scale message processing and real-time data streaming.