- StreamNative Cloud
- Connect
Connect to cluster through Node.js client
This document describes how to connect to a cluster through a Node.js client, and use the Node.js producer and consumer to produce and consume messages to and from a topic. The Node.js client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to cluster through OAuth2 authentication plugin
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.
Connect to a Pulsar cluster through the OAuth2 authentication plugin.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: issuer_url, } if (private_key.length > 0) { params['private_key'] = private_key } else { params['client_id'] = client_id params['client_secret'] = client_secret } if (audience.length > 0) { params['audience'] = audience } if (scope.length > 0) { params['scope'] = scope } const auth = new Pulsar.AuthenticationOauth2(params) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, tlsAllowInsecureConnection: true, authentication: auth, }) await client.close() })()
serviceUrl
: the broker service URL of your Pulsar cluster.issuer_url
: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.private_key
: the path to your downloaded OAuth2 credential file. TheprivateKey
parameter supports the following three pattern formats:file:///path/to/file
file:/path/to/file
data:application/json;base64,<base64-encoded value>
client_id
: the client ID of your application. You can get the value from your downloaded OAuth2 credential file.client_secret
: the client secret of your application. You can get the value from your downloaded OAuth2 credential file.audience
: theaudience
parameter is the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, the organization name, and the Pulsar instance name, in this formaturn:sn:pulsar:<org_name>:<instance_name>
.
Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: issuer_url, } if (private_key.length > 0) { params['private_key'] = private_key } else { params['client_id'] = client_id params['client_secret'] = client_secret } if (audience.length > 0) { params['audience'] = audience } if (scope.length > 0) { params['scope'] = scope } const auth = new Pulsar.AuthenticationOauth2(params) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, authentication: auth, operationTimeoutSeconds: 30, }) // Create a consumer const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'sub1', subscriptionType: 'Shared', ackTimeoutMs: 10000, }) // Receive messages for (let i = 0; i < 10; i += 1) { const msg = await consumer.receive() console.log(msg.getData().toString()) consumer.acknowledge(msg) } await consumer.close() await client.close() })()
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client') const issuer_url = process.env.ISSUER_URL const private_key = process.env.PRIVATE_KEY const audience = process.env.AUDIENCE const scope = process.env.SCOPE const service_url = process.env.SERVICE_URL const client_id = process.env.CLIENT_ID const client_secret = process.env.CLIENT_SECRET ;(async () => { const params = { issuer_url: issuer_url, } if (private_key.length > 0) { params['private_key'] = private_key } else { params['client_id'] = client_id params['client_secret'] = client_secret } if (audience.length > 0) { params['audience'] = audience } if (scope.length > 0) { params['scope'] = scope } const auth = new Pulsar.AuthenticationOauth2(params) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, authentication: auth, operationTimeoutSeconds: 30, }) // Create a producer const producer = await client.createProducer({ topic: 'persistent://public/default/my-topic', sendTimeoutMs: 30000, batchingEnabled: true, }) // Send messages for (let i = 0; i < 10; i += 1) { const msg = `my-message-${i}` producer.send({ data: Buffer.from(msg), }) console.log(`Sent message: ${msg}`) } await producer.flush() await producer.close() await client.close() })()
Connect to cluster through Token authentication plugin
To connect a cluster through the Token authentication plugin, follow these steps.
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the token of your service account. For details, see get a token.
Connect to a Pulsar cluster through the Token authentication plugin.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: auth_params, }) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, authentication: auth, operationTimeoutSeconds: 30, }) await client.close() })()
service_url
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: auth_params, }) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, authentication: auth, operationTimeoutSeconds: 30, }) // Create a consumer const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'sub1', subscriptionType: 'Shared', ackTimeoutMs: 10000, }) // Receive messages for (let i = 0; i < 10; i += 1) { const msg = await consumer.receive() console.log(msg.getData().toString()) consumer.acknowledge(msg) } await consumer.close() await client.close() })()
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client') const auth_params = process.env.AUTH_PARAMS const service_url = process.env.SERVICE_URL ;(async () => { const auth = new Pulsar.AuthenticationToken({ token: auth_params, }) // Create a client const client = new Pulsar.Client({ serviceUrl: service_url, authentication: auth, operationTimeoutSeconds: 30, }) // Create a producer const producer = await client.createProducer({ topic: 'persistent://public/default/my-topic', sendTimeoutMs: 30000, batchingEnabled: true, }) // Send messages for (let i = 0; i < 10; i += 1) { const msg = `my-message-${i}` producer.send({ data: Buffer.from(msg), }) console.log(`Sent message: ${msg}`) } await producer.flush() await producer.close() await client.close() })()
For more information about the Pulsar Node.js client, see Node.js client examples.