This document provides examples about how to use the Pulsar Node.js client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
-
Connect to the Pulsar cluster.
const Pulsar = require('pulsar-client')
const auth_params = process.env.AUTH_PARAMS
const service_url = process.env.SERVICE_URL
;(async () => {
const auth = new Pulsar.AuthenticationToken({
token: 'AUTH_PARAMS',
})
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'your-service-url',
authentication: auth,
operationTimeoutSeconds: 30,
})
await client.close()
})()
Set the serviceUrl
and token
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.
-
Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client')
const auth_params = process.env.AUTH_PARAMS
const service_url = process.env.SERVICE_URL
;(async () => {
const auth = new Pulsar.AuthenticationToken({
token: 'AUTH_PARAMS',
})
// Create a consumer
const consumer = await client.subscribe({
topic: 'your-topic',
subscription: 'your-sub-name',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
})
// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive()
console.log(msg.getData().toString())
await consumer.acknowledge(msg)
}
await consumer.close()
await client.close()
})()
-
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client')
const auth_params = process.env.AUTH_PARAMS
const service_url = process.env.SERVICE_URL
;(async () => {
const auth = new Pulsar.AuthenticationToken({
token: 'AUTH_PARAMS',
})
const producer = await client.createProducer({
topic: 'your-topic',
sendTimeoutMs: 30000,
batchingEnabled: true,
})
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`
producer.send({
data: Buffer.from(msg),
})
console.log(`Sent message: ${msg}`)
}
await producer.flush()
await producer.close()
await client.close()
})()
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
-
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
-
Save the App credentials into an OAuth2 credential file.
-
Connect to a Pulsar cluster through the OAuth2 credential file.
const Pulsar = require('pulsar-client')
const issuer_url = process.env.ISSUER_URL
const private_key = process.env.PRIVATE_KEY
const audience = process.env.AUDIENCE
const scope = process.env.SCOPE
const service_url = process.env.SERVICE_URL
const client_id = process.env.CLIENT_ID
const client_secret = process.env.CLIENT_SECRET
;(async () => {
const params = {
issuer_url: 'your-issuer-url',
}
if (private_key.length > 0) {
params['private_key'] = 'file:///path/to/private-key-file.json'
} else {
params['client_id'] = 'your-client-id'
params['client_secret'] = 'your-client-secret'
}
if (audience.length > 0) {
params['audience'] = 'your-audience'
}
if (scope.length > 0) {
params['scope'] = 'your-scope'
}
const auth = new Pulsar.AuthenticationOauth2(params)
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar-service-url',
tlsAllowInsecureConnection: true,
authentication: auth,
})
await client.close()
})()
issuer_url
: the URL of your OAuth2 identity provider.
private_key
: the path to your OAuth2 credential file.
client_id
: the Pulsar application client ID.
client_secret
: the Pulsar application client secret.
audience
: the audience
of your Pulsar cluster.
serviceUrl
: the URL of your Pulsar cluster.
-
Create a Node.js consumer and use the Node.js consumer to consume messages.
const Pulsar = require('pulsar-client')
const issuer_url = process.env.ISSUER_URL
const private_key = process.env.PRIVATE_KEY
const audience = process.env.AUDIENCE
const scope = process.env.SCOPE
const service_url = process.env.SERVICE_URL
const client_id = process.env.CLIENT_ID
const client_secret = process.env.CLIENT_SECRET
;(async () => {
const params = {
issuer_url: 'your-issuer-url',
}
if (private_key.length > 0) {
params['private_key'] = 'file:///path/to/private-key-file.json'
} else {
params['client_id'] = 'your-client-id'
params['client_secret'] = 'your-client-secret'
}
if (audience.length > 0) {
params['audience'] = 'your-audience'
}
if (scope.length > 0) {
params['scope'] = 'your-scope'
}
const auth = new Pulsar.AuthenticationOauth2(params)
const consumer = await client.subscribe({
topic: 'your-topic',
subscription: 'your-sub-name',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
})
// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive()
console.log(msg.getData().toString())
await consumer.acknowledge(msg)
}
await consumer.close()
await client.close()
})()
-
Create a Node.js producer and use the Node.js producer to produce messages.
const Pulsar = require('pulsar-client')
const issuer_url = process.env.ISSUER_URL
const private_key = process.env.PRIVATE_KEY
const audience = process.env.AUDIENCE
const scope = process.env.SCOPE
const service_url = process.env.SERVICE_URL
const client_id = process.env.CLIENT_ID
const client_secret = process.env.CLIENT_SECRET
;(async () => {
const params = {
issuer_url: 'your-issuer-url',
}
if (private_key.length > 0) {
params['private_key'] = 'file:///path/to/private-key-file.json'
} else {
params['client_id'] = 'your-client-id'
params['client_secret'] = 'your-client-secret'
}
if (audience.length > 0) {
params['audience'] = 'your-audience'
}
if (scope.length > 0) {
params['scope'] = 'your-scope'
}
const auth = new Pulsar.AuthenticationOauth2(params)
const producer = await client.createProducer({
topic: 'your-topic',
sendTimeoutMs: 30000,
batchingEnabled: true,
})
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`
producer.send({
data: Buffer.from(msg),
})
console.log(`Sent message: ${msg}`)
}
await producer.flush()
await producer.close()
await client.close()
})()