These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces k8s-01 and k8s-02. If you are new to StreamNative Platform, work through the StreamNative Platform deployment guide first, and then return to this tutorial.
This section guides you through the steps to enable asynchronous geo-replication. You can enable asynchronous geo-replication at either the namespace or topic level.
Namespace-level asynchronous geo-replication: when namespace-level asynchronous geo-replication is enabled, the data that is written to the topics in the configured namespace can be replicated to other Pulsar clusters.
Topic-level asynchronous geo-replication: when topic-level asynchronous geo-replication is enabled, only the data that is written to the configured topic can be replicated to other Pulsar clusters.
The instructions in this section assume that you work on two Pulsar clusters (pulsar-a and pulsar-b).
If you only want to implement the unidirectional asynchronous geo-replication between these two Pulsar clusters, such as from pulsar-a to pulsar-b, you can enable asynchronous geo-replication on Pulsar cluster pulsar-a, and vice versa.
If you want to implement the bidirectional asynchronous geo-replication between these two Pulsar clusters (pulsar-a and pulsar-b), you need to enable asynchronous geo-replication on both Pulsar clusters.
It is recommended to disable Vault service to save resources.
Run the commands below to create the connection between these two Pulsar clusters.
Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.
Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.
Run the command below on Pulsar cluster pulsar-b to create the connection from Pulsar cluster pulsar-b to Pulsar cluster pulsar-a. Then, when a client writes data to the Pulsar cluster pulsar-b, the data is replicated from Pulsar cluster pulsar-b to Pulsar cluster pulsar-a.
Create a namespace (rep-tenant-1/rep-ns) in both Pulsar clusters, grant the Admin role as well as the produce and consume permissions to the namespace, and set the replication clusters for the namespace.
Run the commands below to create the connection between these two Pulsar clusters.
Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.
Run the command below on Pulsar cluster pulsar-a to create the connection from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b. Then, when a client writes data to the Pulsar cluster pulsar-a, the data is replicated from Pulsar cluster pulsar-a to Pulsar cluster pulsar-b.
Run the command below on Pulsar cluster pulsar-b to create the connection from Pulsar cluster pulsar-b to Pulsar cluster pulsar-a. Then, when a client writes data to the Pulsar cluster pulsar-b, the data is replicated from Pulsar cluster pulsar-b to Pulsar cluster pulsar-a.
On the consumer terminal, the output should be similar to the following:
Copy
Ask AI
root@pulsar-a-sn-platform-toolset-0:/pulsar# bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topicWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.pulsar.common.util.netty.DnsResolverUtil (file:/pulsar/lib/io.streamnative-pulsar-common-2.10.2.4.jar) to method sun.net.InetAddressCachePolicy.get()WARNING: Please consider reporting this to the maintainers of org.apache.pulsar.common.util.netty.DnsResolverUtilWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release11:25:05.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x7784f0c4, L:/10.90.10.180:56584 - R:pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server11:25:05.303 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"sub","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"0de7c","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}11:25:05.319 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://pulsar-a-sn-platform-broker-headless:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}11:25:05.334 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server11:25:05.336 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribing to topic on cnx [id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650], consumerId 011:25:05.341 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribed to topic on pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650 -- consumer: 011:25:05.343 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [rep-tenant-1/rep-ns/rep-topic] [sub] Success subscribe new topic persistent://rep-tenant-1/rep-ns/rep-topic in topics consumer, partitions: 1, allTopicPartitionsNumber: 111:25:08.822 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initializedsidebarTitle: Configure Asynchronous Geo-replication----- got message -----key:[null], properties:[], content:hello-world
This section provides some examples on how to enable asynchronous geo-replication when different authentication methods are enabled on StreamNative Platform.
These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces (k8s-01 and k8s-02) and that you work on two Pulsar clusters (pulsar-a and pulsar-b).
Before enabling JWT authentication on your Pulsar clusters, you need to create JWT secrets for your Pulsar cluster. You can run the prepare_helm_release.sh script to automatically generate JWT secrets for one Pulsar cluster. Then, copy these secrets from one Kubernetes namespace to another and create new secrets for another Pulsar cluster based on the copied secrets.
Clone this repository and switch to the target directory.
-n,--namespace: the Kubernetes namespace to install the helm chart
-k,--release: the helm release name
By default, the command generates the asymmetric public/private key pair. You can choose to generate a symmetric secret key by specifying --symmetric in the command.
Check the created secrets.
Copy
Ask AI
kubectl get secret -n k8s-01NAME TYPE DATA AGEpulsar-a-token-admin Opaque 2 26spulsar-a-token-asymmetric-key Opaque 2 34spulsar-a-token-broker-admin Opaque 2 29spulsar-a-token-proxy-admin Opaque 2 31spulsar-a-token-pulsar-manager-admin Opaque 2
pulsar-a-token-admin: the role that is used to access the admin tools.
pulsar-a-token-asymmetric-key: a public/private key pair that is used to generate and validate tokens in an asymmetric algorithm. You can use the private key to generate tokens. You can use the public key to validate tokens.
pulsar-a-token-broker-admin: the role that is used for inter-broker communications.
pulsar-a-token-proxy-admin: the role that is used for Pulsar proxies to communicate to Pulsar brokers.
pulsar-a-token-pulsar-manager-admin: the superuser role that is used to access the StreamNative Console.
Clone this repository and switch to the target directory.
-n,--namespace: the Kubernetes namespace to install the helm chart
-k,--release: the helm release name
By default, the command generates the asymmetric public/private key pair. You can choose to generate a symmetric secret key by specifying --symmetric in the command.
Check the created secrets.
Copy
Ask AI
kubectl get secret -n k8s-01NAME TYPE DATA AGEpulsar-a-token-admin Opaque 2 26spulsar-a-token-asymmetric-key Opaque 2 34spulsar-a-token-broker-admin Opaque 2 29spulsar-a-token-proxy-admin Opaque 2 31spulsar-a-token-pulsar-manager-admin Opaque 2
pulsar-a-token-admin: the role that is used to access the admin tools.
pulsar-a-token-asymmetric-key: a public/private key pair that is used to generate and validate tokens in an asymmetric algorithm. You can use the private key to generate tokens. You can use the public key to validate tokens.
pulsar-a-token-broker-admin: the role that is used for inter-broker communications.
pulsar-a-token-proxy-admin: the role that is used for Pulsar proxies to communicate to Pulsar brokers.
pulsar-a-token-pulsar-manager-admin: the superuser role that is used to access the StreamNative Console.
Copy the secrets of pulsar-a from Kubernetes namespace k8s-01 to Kubernetes namespace k8s-02.
Create new secrets for cluster pulsar-b based on the copied secrets.
This example shows how to create a new secret (pulsar-b-token-admin) based on the pulsar-a-token-admin secret.
a. Access the pulsar-a-token-admin secret.
Copy
Ask AI
kubectl get secret pulsar-a-token-admin -n k8s-01 -o yaml
The pulsar-a-token-admin secret should be similar to the following:
b. Replace the secret name and the Kubernetes namespace name with pulsar-b-token-admin and k8s-02 respectively and save the YAML file with a new name pulsar-b-token-admin.
The pulsar-b-token-admin secret should be similar to the following: