1. StreamNative Cloud
  2. Connect

Connect to cluster through Node.js client

This document describes how to connect to a cluster through a Node.js client, and use the Node.js producer and consumer to produce and consume messages to and from a topic. The Node.js client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Connect to cluster through OAuth2 authentication plugin

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.

  3. Connect to a Pulsar cluster through the OAuth2 authentication plugin.

    const Pulsar = require('pulsar-client')
    
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: issuer_url,
      }
      if (private_key.length > 0) {
        params['private_key'] = private_key
      } else {
        params['client_id'] = client_id
        params['client_secret'] = client_secret
      }
      if (audience.length > 0) {
        params['audience'] = audience
      }
      if (scope.length > 0) {
        params['scope'] = scope
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        tlsAllowInsecureConnection: true,
        authentication: auth,
      })
    
      await client.close()
    })()
    
    • serviceUrl: the broker service URL of your Pulsar cluster.
    • issuer_url: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
    • private_key: the path to your downloaded OAuth2 credential file. The privateKey parameter supports the following three pattern formats:
      • file:///path/to/file
      • file:/path/to/file
      • data:application/json;base64,<base64-encoded value>
    • client_id: the client ID of your application. You can get the value from your downloaded OAuth2 credential file.
    • client_secret: the client secret of your application. You can get the value from your downloaded OAuth2 credential file.
    • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.
  4. Create a Node.js consumer and use the Node.js consumer to consume messages.

    const Pulsar = require('pulsar-client')
    
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: issuer_url,
      }
      if (private_key.length > 0) {
        params['private_key'] = private_key
      } else {
        params['client_id'] = client_id
        params['client_secret'] = client_secret
      }
      if (audience.length > 0) {
        params['audience'] = audience
      }
      if (scope.length > 0) {
        params['scope'] = scope
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      // Create a consumer
      const consumer = await client.subscribe({
        topic: 'persistent://public/default/my-topic',
        subscription: 'sub1',
        subscriptionType: 'Shared',
        ackTimeoutMs: 10000,
      })
    
      // Receive messages
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive()
        console.log(msg.getData().toString())
        consumer.acknowledge(msg)
      }
    
      await consumer.close()
      await client.close()
    })()
    
  5. Create a Node.js producer and use the Node.js producer to produce messages.

    const Pulsar = require('pulsar-client')
    
    const issuer_url = process.env.ISSUER_URL
    const private_key = process.env.PRIVATE_KEY
    const audience = process.env.AUDIENCE
    const scope = process.env.SCOPE
    const service_url = process.env.SERVICE_URL
    const client_id = process.env.CLIENT_ID
    const client_secret = process.env.CLIENT_SECRET
    
    ;(async () => {
      const params = {
        issuer_url: issuer_url,
      }
      if (private_key.length > 0) {
        params['private_key'] = private_key
      } else {
        params['client_id'] = client_id
        params['client_secret'] = client_secret
      }
      if (audience.length > 0) {
        params['audience'] = audience
      }
      if (scope.length > 0) {
        params['scope'] = scope
      }
      const auth = new Pulsar.AuthenticationOauth2(params)
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      // Create a producer
      const producer = await client.createProducer({
        topic: 'persistent://public/default/my-topic',
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      })
    
      // Send messages
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`
        producer.send({
          data: Buffer.from(msg),
        })
        console.log(`Sent message: ${msg}`)
      }
      await producer.flush()
    
      await producer.close()
      await client.close()
    })()
    

Connect to cluster through Token authentication plugin

To connect a cluster through the Token authentication plugin, follow these steps.

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the token of your service account. For details, see get a token.

  3. Connect to a Pulsar cluster through the Token authentication plugin.

    const Pulsar = require('pulsar-client')
    
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: auth_params,
      })
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      await client.close()
    })()
    
    • service_url: the broker service URL of your Pulsar cluster.
    • AUTH_PARAMS: the token of your service account.
  4. Create a Node.js consumer and use the Node.js consumer to consume messages.

    const Pulsar = require('pulsar-client')
    
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: auth_params,
      })
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      // Create a consumer
      const consumer = await client.subscribe({
        topic: 'persistent://public/default/my-topic',
        subscription: 'sub1',
        subscriptionType: 'Shared',
        ackTimeoutMs: 10000,
      })
    
      // Receive messages
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive()
        console.log(msg.getData().toString())
        consumer.acknowledge(msg)
      }
    
      await consumer.close()
      await client.close()
    })()
    
  5. Create a Node.js producer and use the Node.js producer to produce messages.

    const Pulsar = require('pulsar-client')
    
    const auth_params = process.env.AUTH_PARAMS
    const service_url = process.env.SERVICE_URL
    
    ;(async () => {
      const auth = new Pulsar.AuthenticationToken({
        token: auth_params,
      })
    
      // Create a client
      const client = new Pulsar.Client({
        serviceUrl: service_url,
        authentication: auth,
        operationTimeoutSeconds: 30,
      })
    
      // Create a producer
      const producer = await client.createProducer({
        topic: 'persistent://public/default/my-topic',
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      })
    
      // Send messages
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`
        producer.send({
          data: Buffer.from(msg),
        })
        console.log(`Sent message: ${msg}`)
      }
      await producer.flush()
    
      await producer.close()
      await client.close()
    })()
    

For more information about the Pulsar Node.js client, see Node.js client examples.

Previous
Clients - Python