1. Operating StreamNative Platform
  2. Advanced

Configure asynchronous geo-replication

This document describes how to configure asynchronous geo-replication on StreamNative Platform.

Prerequisites

These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces k8s-01 and k8s-02. If you are new to StreamNative Platform, work through the StreamNative Platform deployment guide first, and then return to this tutorial.

Enable asynchronous geo-replication

This section guides you through the steps to enable asynchronous geo-replication. You can enable asynchronous geo-replication at either the namespace or topic level.

  • Namespace-level asynchronous geo-replication: when namespace-level asynchronous geo-replication is enabled, the data that is written to the topics in the configured namespace can be replicated to other Pulsar clusters.
  • Topic-level asynchronous geo-replication: when topic-level asynchronous geo-replication is enabled, only the data that is written to the configured topic can be replicated to other Pulsar clusters.

Note

  • The instructions in this section assume that you work on two Pulsar clusters (pulsar-a and pulsar-b).
  • If you only want to implement the unidirectional asynchronous geo-replication between these two Pulsar clusters, such as from pulsar-a to pulsar-b, you can enable asynchronous geo-replication on Pulsar cluster pulsar-a, and vice versa.
  • If you want to implement the bidirectional asynchronous geo-replication between these two Pulsar clusters (pulsar-a and pulsar-b), you need to enable asynchronous geo-replication on both Pulsar clusters.
  • It is recommended to disable Vault service to save resources.

Enable namespace-level asynchronous geo-replication

This section describes how to enable namespace-level asynchronous geo-replication.

Step 1: Create cluster connection

Run the commands below to create the connection between these two Pulsar clusters.

Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.

kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
bin/pulsar-admin clusters create \
  --broker-url pulsar://pulsar-b-sn-platform-broker.k8s-02:6650 \
  --url http://pulsar-b-sn-platform-broker.k8s-02:8080 \
  pulsar-b-sn-platform

Step 2: Create tenants and grant permissions

Create a tenant (rep-tenant-1) in both Pulsar clusters, grant the Admin role to the tenant, and allow the tenant to access both Pulsar clusters.

./bin/pulsar-admin tenants create rep-tenant-1 --admin-roles admin --allowed-clusters pulsar-a-sn-platform,pulsar-b-sn-platform

Step 3: Create namespaces

Create a namespace (rep-tenant-1/rep-ns) in both Pulsar clusters, grant the Admin role as well as the produce and consume permissions to the namespace, and set the replication clusters for the namespace.

./bin/pulsar-admin namespaces create rep-tenant-1/rep-ns
./bin/pulsar-admin namespaces grant-permission rep-tenant-1/rep-ns --actions produce,consume --role admin
./bin/pulsar-admin namespaces set-clusters rep-tenant-1/rep-ns --clusters pulsar-a-sn-platform,pulsar-b-sn-platform

Step 4: Create topics

Create a partitioned topic (rep-tenant-1/rep-ns/rep-topic) in both Pulsar clusters.

./bin/pulsar-admin topics create-partitioned-topic -p 1 persistent://rep-tenant-1/rep-ns/rep-topic

Enable topic-level asynchronous geo-replication

This section describes how to enable namespace-level asynchronous geo-replication.

Step 1: Create cluster connection

Run the commands below to create the connection between these two Pulsar clusters.

Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.

kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
bin/pulsar-admin clusters create \
  --broker-url pulsar://pulsar-b-sn-platform-broker.k8s-02:6650 \
  --url http://pulsar-b-sn-platform-broker.k8s-02:8080 \
  pulsar-b-sn-platform

Step 2: Create tenants and grant permissions

Create a tenant (rep-tenant-1) in both Pulsar clusters, grant the Admin role to the tenant, and allow the tenant to access both Pulsar clusters.

./bin/pulsar-admin tenants create rep-tenant-1 --admin-roles admin --allowed-clusters pulsar-a-sn-platform,pulsar-b-sn-platform

Step 3: Create namespaces

Create a namespace (rep-tenant-1/rep-ns) in both Pulsar clusters.

./bin/pulsar-admin namespaces create rep-tenant-1/rep-ns

Step 4: Create topics

Create a partitioned topic (rep-tenant-1/rep-ns/rep-topic) in both Pulsar clusters and set the replication clusters for the topic.

./bin/pulsar-admin topics create-partitioned-topic -p 1 persistent://rep-tenant-1/rep-ns/rep-topic
./bin/pulsar-admin topics set-replication-clusters --clusters pulsar-a-sn-platform,pulsar-b-sn-platform rep-tenant-1/rep-ns/rep-topic

Verify asynchronous geo-replication

This example shows that when you produce messages to Pulsar cluster pulsar-b, the consumer can read it from Pulsar cluster pulsar-a.

  1. Consume the message from Pulsar cluster pulsar-a.

    kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
    bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
    
  2. Produce a message (hello-world) to Pulsar cluster pulsar-b.

    kubectl exec -it pulsar-b-sn-platform-toolset-0 /bin/bash -n k8s-02
    bin/pulsar-client produce -m 'hello-world' rep-tenant-1/rep-ns/rep-topic
    

On the consumer terminal, the output should be similar to the following:

05:16:44.185 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xa8dc9ad8, L:/10.90.12.162:50018 - R:pulsar-a-sn-platform-broker.k8s-01.svc.cluster.local/172.20.234.167:6650]] Connected to server
05:16:44.298 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"sub","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"64b24","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
05:16:44.314 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://pulsar-a-sn-platform-broker:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
05:16:44.329 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xfa831f72, L:/10.90.12.162:60296 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650]] Connected to server
05:16:44.331 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribing to topic on cnx [id: 0xfa831f72, L:/10.90.12.162:60296 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650], consumerId 0
05:16:44.424 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribed to topic on pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650 -- consumer: 0
05:16:44.427 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [rep-tenant-1/rep-ns/rep-topic] [sub] Success subscribe new topic persistent://rep-tenant-1/rep-ns/rep-topic in topics consumer, partitions: 1, allTopicPartitionsNumber: 1
05:17:46.416 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:hello-world

This example shows that when you produce messages to Pulsar cluster pulsar-a, the consumer can read it from Pulsar cluster pulsar-b.

  1. Consume the message from Pulsar cluster pulsar-b.

    kubectl exec -it pulsar-b-sn-platform-toolset-0 /bin/bash -n k8s-02
    bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
    
  2. Produce a message (hello-world) to Pulsar cluster pulsar-a.

    kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
    bin/pulsar-client produce -m 'hello-world' rep-tenant-1/rep-ns/rep-topic
    

On the consumer terminal, the output should be similar to the following:

root@pulsar-a-sn-platform-toolset-0:/pulsar# bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.pulsar.common.util.netty.DnsResolverUtil (file:/pulsar/lib/io.streamnative-pulsar-common-2.10.2.4.jar) to method sun.net.InetAddressCachePolicy.get()
WARNING: Please consider reporting this to the maintainers of org.apache.pulsar.common.util.netty.DnsResolverUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
11:25:05.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x7784f0c4, L:/10.90.10.180:56584 - R:pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server
11:25:05.303 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"sub","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"0de7c","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
11:25:05.319 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://pulsar-a-sn-platform-broker-headless:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
11:25:05.334 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server
11:25:05.336 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribing to topic on cnx [id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650], consumerId 0
11:25:05.341 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribed to topic on pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650 -- consumer: 0
11:25:05.343 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [rep-tenant-1/rep-ns/rep-topic] [sub] Success subscribe new topic persistent://rep-tenant-1/rep-ns/rep-topic in topics consumer, partitions: 1, allTopicPartitionsNumber: 1
11:25:08.822 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:hello-world

Examples

This section provides some examples on how to enable asynchronous geo-replication when different authentication methods are enabled on StreamNative Platform.

Note

These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces (k8s-01 and k8s-02) and that you work on two Pulsar clusters (pulsar-a and pulsar-b).

Enable JWT authentication and asynchronous geo-replication

If you want to enable JWT authentication and geo-replication for your Pulsar clusters, follow the instructions in this section.

Create JWT secrets for your Pulsar clusters

Before enabling JWT authentication on your Pulsar clusters, you need to create JWT secrets for your Pulsar cluster. You can run the prepare_helm_release.sh script to automatically generate JWT secrets for one Pulsar cluster. Then, copy these secrets from one Kubernetes namespace to another and create new secrets for another Pulsar cluster based on the copied secrets.

  1. Clone this repository and switch to the target directory.

    git clone https://github.com/streamnative/charts.git
    cd charts
    
  2. Run the prepare_helm_release.sh script to create JWT secrets.

    ./scripts/pulsar/prepare_helm_release.sh -n k8s-01 -k pulsar-a
    
    • -n,--namespace: the Kubernetes namespace to install the helm chart
    • -k,--release: the helm release name

    By default, the command generates the asymmetric public/private key pair. You can choose to generate a symmetric secret key by specifying --symmetric in the command.

  3. Check the created secrets.

    kubectl get secret -n k8s-01
    NAME                                  TYPE                                  DATA   AGE
    pulsar-a-token-admin                  Opaque                                2      26s
    pulsar-a-token-asymmetric-key         Opaque                                2      34s
    pulsar-a-token-broker-admin           Opaque                                2      29s
    pulsar-a-token-proxy-admin            Opaque                                2      31s
    pulsar-a-token-pulsar-manager-admin   Opaque                                2
    
    • pulsar-a-token-admin: the role that is used to access the admin tools.
    • pulsar-a-token-asymmetric-key: a public/private key pair that is used to generate and validate tokens in an asymmetric algorithm. You can use the private key to generate tokens. You can use the public key to validate tokens.
    • pulsar-a-token-broker-admin: the role that is used for inter-broker communications.
    • pulsar-a-token-proxy-admin: the role that is used for Pulsar proxies to communicate to Pulsar brokers.
    • pulsar-a-token-pulsar-manager-admin: the superuser role that is used to access the StreamNative Console.

Now, these two Pulsar clusters share the same private key.

Configure JWT authentication for your Pulsar clusters

For details, see configure JWT authentication.

Enable asynchronous geo-replication

For details, see enable asynchronous geo-replication.

Enable OAuth2 authentication and asynchronous geo-replication

If you want to enable OAuth2 authentication and asynchronous geo-replication for your Pulsar clusters, follow these steps:

  1. Configure OAuth2 authentication for your Pulsar clusters.

    For details, see Configure OAuth2 authentication.

  2. Enable asynchronous geo-replication.

    For details, see enable asynchronous geo-replication.

Previous
Configure Log Format