Connect to your cluster using the Kafka Node.js client

Note

This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your Pulsar cluster using the Kafka Node.js client through Token authentication.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Install the Kafka Nodejs client.

    npm install kafkajs
    
  2. Build a Nodejs application to produce and consume messages.

    const { Kafka } = require('kafkajs')
    
    // Step 1: replace with your configurations
    let serverUrl = 'SERVER-URL'
    let jwtToken = 'YOUR-TOKEN'
    let topicName = 'test-js-topic'
    
    // Step 2: create the kafka client
    const kafka = new Kafka({
      clientId: 'my-app',
      brokers: [serverUrl],
      ssl: true,
      sasl: {
        mechanism: 'oauthbearer',
        oauthBearerProvider: async () => {
          return {
            value: jwtToken,
          }
        },
      },
    })
    
    // Step 3: send a message to your topic
    async function send() {
      const producer = kafka.producer()
      await producer.connect()
    
      let resp = await producer.send({
        topic: topicName,
        messages: [
          {
            value: 'Hello KafkaJS user!',
          },
        ],
      })
    
      console.log(`Send message:`, resp)
    
      await producer.disconnect()
    }
    
    // Step 4: read messages from the beginning of your topic
    async function receive() {
      const consumer = kafka.consumer({ groupId: 'my-group' })
      await consumer.connect()
      console.log('Connected to Kafka')
    
      await consumer.subscribe({ topic: topicName, fromBeginning: true })
    
      await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          console.log(`Received message:`, {
            value: message.value.toString(),
            headers: message.headers,
            topic: topic,
            partition: partition,
            offset: message.offset,
          })
        },
      })
    }
    
    // Step 5: send one message then receive
    send().then(function () {
      receive()
    })
    
    • serverUrl: the Kafka service URL of your Pulsar cluster.
    • jwtToken: the token of your service account.
  3. Run the Node.js application with your saved scripts.

    This example runs the Node.js application assuming that you save the scripts as kop_test.js.

    node kop_test.js
    

    You should see the following output:

    Send message: [
      {
        topicName: 'test-js-topic',
        partition: 0,
        errorCode: 0,
        baseOffset: '7',
        logAppendTime: '-1',
        logStartOffset: '-1'
      }
    ]
    Connected to Kafka
    Received message: {
      value: 'Hello KafkaJS user!',
      headers: {},
      topic: 'test-js-topic',
      partition: 0,
      offset: '0'
    }
    ...
    
Previous
Kafka Go