Connect to Pulsar cluster using Node.js client
This document provides examples about how to use the Pulsar Node.js client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
Note
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
Connect to the Pulsar cluster.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: 'AUTH_PARAMS', }) // Create a client const client = new Pulsar.Client({ serviceUrl: 'your-service-url', authentication: auth, operationTimeoutSeconds: 30, }) await client.close() })()
Set the
serviceUrl
andtoken
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: 'AUTH_PARAMS', }) // Create a consumer const consumer = await client.subscribe({ topic: 'your-topic', subscription: 'your-sub-name', subscriptionType: 'Shared', ackTimeoutMs: 10000, }) // Receive messages for (let i = 0; i < 10; i += 1) { const msg = await consumer.receive() console.log(msg.getData().toString()) await consumer.acknowledge(msg) } await consumer.close() await client.close() })()
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: 'AUTH_PARAMS', }) const producer = await client.createProducer({ topic: 'your-topic', sendTimeoutMs: 30000, batchingEnabled: true, }) // Send messages for (let i = 0; i < 10; i += 1) { const msg = `my-message-${i}` producer.send({ data: Buffer.from(msg), }) console.log(`Sent message: ${msg}`) } await producer.flush() await producer.close() await client.close() })()
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
Save the App credentials into an OAuth2 credential file.
Connect to a Pulsar cluster through the OAuth2 credential file.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: 'your-issuer-url', } if (private_key.length > 0) { params['private_key'] = 'file:///path/to/private-key-file.json' } else { params['client_id'] = 'your-client-id' params['client_secret'] = 'your-client-secret' } if (audience.length > 0) { params['audience'] = 'your-audience' } if (scope.length > 0) { params['scope'] = 'your-scope' } const auth = new Pulsar.AuthenticationOauth2(params) // Create a client const client = new Pulsar.Client({ serviceUrl: 'pulsar-service-url', tlsAllowInsecureConnection: true, authentication: auth, }) await client.close() })()
issuer_url
: the URL of your OAuth2 identity provider.private_key
: the path to your OAuth2 credential file.client_id
: the Pulsar application client ID.client_secret
: the Pulsar application client secret.audience
: theaudience
of your Pulsar cluster.serviceUrl
: the URL of your Pulsar cluster.
Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: 'your-issuer-url', } if (private_key.length > 0) { params['private_key'] = 'file:///path/to/private-key-file.json' } else { params['client_id'] = 'your-client-id' params['client_secret'] = 'your-client-secret' } if (audience.length > 0) { params['audience'] = 'your-audience' } if (scope.length > 0) { params['scope'] = 'your-scope' } const auth = new Pulsar.AuthenticationOauth2(params) const consumer = await client.subscribe({ topic: 'your-topic', subscription: 'your-sub-name', subscriptionType: 'Shared', ackTimeoutMs: 10000, }) // Receive messages for (let i = 0; i < 10; i += 1) { const msg = await consumer.receive() console.log(msg.getData().toString()) await consumer.acknowledge(msg) } await consumer.close() await client.close() })()
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: 'your-issuer-url', } if (private_key.length > 0) { params['private_key'] = 'file:///path/to/private-key-file.json' } else { params['client_id'] = 'your-client-id' params['client_secret'] = 'your-client-secret' } if (audience.length > 0) { params['audience'] = 'your-audience' } if (scope.length > 0) { params['scope'] = 'your-scope' } const auth = new Pulsar.AuthenticationOauth2(params) const producer = await client.createProducer({ topic: 'your-topic', sendTimeoutMs: 30000, batchingEnabled: true, }) // Send messages for (let i = 0; i < 10; i += 1) { const msg = `my-message-${i}` producer.send({ data: Buffer.from(msg), }) console.log(`Sent message: ${msg}`) } await producer.flush() await producer.close() await client.close() })()