Connect to your cluster using the Node.js client
This document describes how to connect to a cluster through a Node.js client, and use the Node.js producer and consumer to produce and consume messages to and from a topic. The Node.js client supports connecting to a Pulsar cluster through either OAuth2 or Token authentication.
Note
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to your cluster through OAuth2 authentication
To connect a Pulsar cluster through OAuth2 authentication, follow these steps.
Step 1: Get the broker service URL of your cluster
To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.
On the left navigation pane, in the Admin area, click Pulsar Clusters.
Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.
Step 2: Get the OAuth2 credential file of your service account
To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
The OAuth2 credential file should be something like this:
{ "type": "SN_SERVICE_ACCOUNT", "client_id": "CLIENT_ID", "client_secret": "CLIENT_SECRET", "client_email": "[email protected]", "issuer_url": "https://auth.streamnative.cloud" }
Step 3: Connect to your cluster
For a complete example of how to connect to a cluster through the Pulsar Node.js client, see Node.js client examples.
Create a Node.js consumer to consume messages
You can create and configure a Node.js consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.
const Pulsar = require('pulsar-client')
const issuer_url = 'https://auth.streamnative.cloud/'
const private_key = '/YOUR-KEY-FILE-PATH' // Absolute file path of your downloaded key file without file:// prefix
const audience = 'urn:sn:pulsar:${orgName}:${instanceName}'
const service_url = '${brokerServiceURL}'
;(async () => {
const params = {
issuer_url: issuer_url,
private_key: private_key,
audience: audience,
}
const auth = new Pulsar.AuthenticationOauth2(params)
const client = new Pulsar.Client({
serviceUrl: service_url,
authentication: auth,
operationTimeoutSeconds: 30,
})
const consumer = await client.subscribe({
topic: 'persistent://${tenant}/${namespace}/${topic}',
subscription: '${subscription}',
subscriptionInitialPosition: 'SubscriptionInitialPosition.Earliest',
})
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive()
console.log(msg.getData().toString())
await consumer.acknowledge(msg)
}
await consumer.close()
await client.close()
})()
Create a Node.js producer to produce messages
You can create and configure a Node.js producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.
const Pulsar = require('pulsar-client')
const issuer_url = 'https://auth.streamnative.cloud/'
const private_key = '/YOUR-KEY-FILE-PATH' // Absolute file path of your downloaded key file without file:// prefix
const audience = 'urn:sn:pulsar:${orgName}:${instanceName}'
const service_url = '${brokerServiceURL}'
;(async () => {
const params = {
issuer_url: issuer_url,
private_key: private_key,
audience: audience,
}
const auth = new Pulsar.AuthenticationOauth2(params)
const client = new Pulsar.Client({
serviceUrl: service_url,
authentication: auth,
operationTimeoutSeconds: 30,
})
const producer = await client.createProducer({
topic: 'persistent://${tenant}/${namespace}/${topic}',
sendTimeoutMs: 30000,
batchingEnabled: true,
})
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`
await producer.send({
data: Buffer.from(msg),
})
console.log(`Sent message: ${msg}`)
}
await producer.flush()
await producer.close()
await client.close()
})()
Parameters for OAuth2 authentication
private_key
: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:file:///path/to/file
: the path to your downloaded OAuth2 credential file.data:application/json;base64,<base64-encoded value>
: the credential file content encoded into Base64 format.
audience
: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.${orgName}
: the name of your organization.${instanceName}
: the name of your instance.
${brokerServiceURL}
: the broker service URL of your Pulsar cluster.${tenant}/${namespace}/${topic}
: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.${subscription}
: the name of the subscription that will determine how messages are delivered.
Connect to your cluster through Token authentication
To connect a Pulsar cluster through Token authentication, follow these steps.
Step 1: Get the broker service URL of your cluster
To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.
On the left navigation pane, in the Admin area, click Pulsar Clusters.
Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.
Step 2: Get the Token of your service account
Note
- Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
- A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
To get a token using the StreamNative Console, follow these steps.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.
Step 3: Connect to your cluster
For a complete example of how to connect to a cluster through the Pulsar Node.js client, see Node.js client examples.
Create a Node.js consumer to consume messages
You can create and configure a Node.js consumer to consume messages using Token authentication as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.
const Pulsar = require('pulsar-client')
const service_url = '${brokerServiceURL}'
const auth_params = '${token}'
;(async () => {
const auth = new Pulsar.AuthenticationToken({
token: auth_params,
})
const client = new Pulsar.Client({
serviceUrl: service_url,
authentication: auth,
operationTimeoutSeconds: 30,
})
const consumer = await client.subscribe({
topic: 'persistent://${tenant}/${namespace}/${topic}',
subscription: '${subscription}',
subscriptionInitialPosition: 'SubscriptionInitialPosition.Earliest',
})
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive()
console.log(msg.getData().toString())
await consumer.acknowledge(msg)
}
await consumer.close()
await client.close()
})()
Create a Node.js producer to produce messages
You can create and configure a Node.js producer to produce messages using Token authentication as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.
const Pulsar = require('pulsar-client')
const service_url = '${brokerServiceURL}'
const auth_params = '${token}'
;(async () => {
const auth = new Pulsar.AuthenticationToken({
token: auth_params,
})
const client = new Pulsar.Client({
serviceUrl: service_url,
authentication: auth,
operationTimeoutSeconds: 30,
})
const producer = await client.createProducer({
topic: 'persistent://${tenant}/${namespace}/${topic}',
sendTimeoutMs: 30000,
batchingEnabled: true,
})
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`
await producer.send({
data: Buffer.from(msg),
})
console.log(`Sent message: ${msg}`)
}
await producer.flush()
await producer.close()
await client.close()
})()
Parameters for Token authentication
${brokerServiceURL}
: the broker service URL of your Pulsar cluster.${token}
: the token of your service account.${tenant}/${namespace}/${topic}
: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.${subscription}
: the name of the subscription that will determine how messages are delivered.