Connect to Pulsar cluster using Node.js client

This document provides examples about how to use the Pulsar Node.js client to connect to a Pulsar cluster through a token or an OAuth2 credential file.

Note

This document assumes that you have created a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Connect to a Pulsar cluster using a token

This section describes how to connect to you Pulsar cluster using a token.

Prerequisites

Steps

To connect to your Pulsar cluster using a token, follow these steps.

  1. Connect to the Pulsar cluster.

    const Pulsar = require('pulsar-client')
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: 'AUTH_PARAMS',
      })
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: 'your-service-url',
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      await client.close()
    })()
    

    Set the serviceUrl and token parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.

  2. Create a Node.js consumer and use the Node.js consumer to consume messages.

    const Pulsar = require('pulsar-client')
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: 'AUTH_PARAMS',
      })
    
      // Create a consumer
      const consumer = await client.subscribe({
        topic: 'your-topic',
        subscription: 'your-sub-name',
        subscriptionType: 'Shared',
        ackTimeoutMs: 10000,
      })
    
      // Receive messages
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive()
        console.log(msg.getData().toString())
        await consumer.acknowledge(msg)
      }
    
      await consumer.close()
      await client.close()
    })()
    
  3. Create a Node.js producer and use the Node.js producer to produce messages.

    const Pulsar = require('pulsar-client')
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: 'AUTH_PARAMS',
      })
    
      const producer = await client.createProducer({
        topic: 'your-topic',
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      })
    
      // Send messages
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`
        producer.send({
          data: Buffer.from(msg),
        })
        console.log(`Sent message: ${msg}`)
      }
      await producer.flush()
    
      await producer.close()
      await client.close()
    })()
    

Connect to a Pulsar cluster using an OAuth2 credential file

To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.

  1. Generate the App credentials by following similar instructions in configure OAuth2 authentication.

  2. Save the App credentials into an OAuth2 credential file.

  3. Connect to a Pulsar cluster through the OAuth2 credential file.

    const Pulsar = require('pulsar-client')
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: 'your-issuer-url',
      }
      if (private_key.length > 0) {
        params['private_key'] = 'file:///path/to/private-key-file.json'
      } else {
        params['client_id'] = 'your-client-id'
        params['client_secret'] = 'your-client-secret'
      }
      if (audience.length > 0) {
        params['audience'] = 'your-audience'
      }
      if (scope.length > 0) {
        params['scope'] = 'your-scope'
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar-service-url',
        tlsAllowInsecureConnection: true,
        authentication: auth,
      })
      await client.close()
    })()
    
    • issuer_url: the URL of your OAuth2 identity provider.
    • private_key: the path to your OAuth2 credential file.
    • client_id: the Pulsar application client ID.
    • client_secret: the Pulsar application client secret.
    • audience: the audience of your Pulsar cluster.
    • serviceUrl: the URL of your Pulsar cluster.
  4. Create a Node.js consumer and use the Node.js consumer to consume messages.

    const Pulsar = require('pulsar-client')
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: 'your-issuer-url',
      }
      if (private_key.length > 0) {
        params['private_key'] = 'file:///path/to/private-key-file.json'
      } else {
        params['client_id'] = 'your-client-id'
        params['client_secret'] = 'your-client-secret'
      }
      if (audience.length > 0) {
        params['audience'] = 'your-audience'
      }
      if (scope.length > 0) {
        params['scope'] = 'your-scope'
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
    
      const consumer = await client.subscribe({
        topic: 'your-topic',
        subscription: 'your-sub-name',
        subscriptionType: 'Shared',
        ackTimeoutMs: 10000,
      })
      // Receive messages
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive()
        console.log(msg.getData().toString())
        await consumer.acknowledge(msg)
      }
      await consumer.close()
      await client.close()
    })()
    
  5. Create a Node.js producer and use the Node.js producer to produce messages.

    const Pulsar = require('pulsar-client')
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: 'your-issuer-url',
      }
      if (private_key.length > 0) {
        params['private_key'] = 'file:///path/to/private-key-file.json'
      } else {
        params['client_id'] = 'your-client-id'
        params['client_secret'] = 'your-client-secret'
      }
      if (audience.length > 0) {
        params['audience'] = 'your-audience'
      }
      if (scope.length > 0) {
        params['scope'] = 'your-scope'
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
    
      const producer = await client.createProducer({
        topic: 'your-topic',
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      })
      // Send messages
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`
        producer.send({
          data: Buffer.from(msg),
        })
        console.log(`Sent message: ${msg}`)
      }
      await producer.flush()
      await producer.close()
      await client.close()
    })()
    
Previous
Pulsar - Go