- Build Applications
- Kafka Clients
- Quick Starts
Connect to your cluster using the Kafka Node.js client
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using the Kafka Node.js client using API Keys authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Install the Kafka Nodejs client.
npm install kafkajs
Build a Nodejs application to produce and consume messages.
const { Kafka } = require('kafkajs') // Step 1: replace with your configurations let serverUrl = 'SERVER-URL' let jwtToken = 'API-KEY' let topicName = 'test-js-topic' let namespace = 'public/default' // Step 2: create the kafka client const kafka = new Kafka({ clientId: 'my-app', brokers: [serverUrl], ssl: true, sasl: { mechanism: 'plain', username: namespace, password: 'token:' + jwtToken, }, }) // Step 3: send a message to your topic async function send() { const producer = kafka.producer() await producer.connect() let resp = await producer.send({ topic: topicName, messages: [ { value: 'Hello KafkaJS user!', }, ], }) console.log(`Send message:`, resp) await producer.disconnect() } // Step 4: read messages from the beginning of your topic async function receive() { const consumer = kafka.consumer({ groupId: 'my-group', readUncommitted: true, // Note: Ursa does not support read_committed for now }) await consumer.connect() console.log('Connected to Kafka') await consumer.subscribe({ topic: topicName, fromBeginning: true }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log(`Received message:`, { value: message.value.toString(), headers: message.headers, topic: topic, partition: partition, offset: message.offset, }) }, }) } // Step 5: send one message then receive send().then(function () { receive() })
SERVER-URL
: the Kafka service URL of your StreamNative cluster.API-KEY
: an API key of your service account.
Run the Node.js application with your saved scripts.
This example runs the Node.js application assuming that you save the scripts as
kop_test.js
.node kop_test.js
You should see the following output:
Send message: [ { topicName: 'test-js-topic', partition: 0, errorCode: 0, baseOffset: '7', logAppendTime: '-1', logStartOffset: '-1' } ] Connected to Kafka Received message: { value: 'Hello KafkaJS user!', headers: {}, topic: 'test-js-topic', partition: 0, offset: '0' } ...