- Operating StreamNative Platform
- Advanced
Configure asynchronous geo-replication
This document describes how to configure asynchronous geo-replication on StreamNative Platform.
Prerequisites
These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces k8s-01
and k8s-02
. If you are new to StreamNative Platform, work through the StreamNative Platform deployment guide first, and then return to this tutorial.
Enable asynchronous geo-replication
This section guides you through the steps to enable asynchronous geo-replication. You can enable asynchronous geo-replication at either the namespace or topic level.
- Namespace-level asynchronous geo-replication: when namespace-level asynchronous geo-replication is enabled, the data that is written to the topics in the configured namespace can be replicated to other Pulsar clusters.
- Topic-level asynchronous geo-replication: when topic-level asynchronous geo-replication is enabled, only the data that is written to the configured topic can be replicated to other Pulsar clusters.
Note
- The instructions in this section assume that you work on two Pulsar clusters (
pulsar-a
andpulsar-b
). - If you only want to implement the unidirectional asynchronous geo-replication between these two Pulsar clusters, such as from
pulsar-a
topulsar-b
, you can enable asynchronous geo-replication on Pulsar clusterpulsar-a
, and vice versa. - If you want to implement the bidirectional asynchronous geo-replication between these two Pulsar clusters (
pulsar-a
andpulsar-b
), you need to enable asynchronous geo-replication on both Pulsar clusters. - It is recommended to disable Vault service to save resources.
Enable namespace-level asynchronous geo-replication
This section describes how to enable namespace-level asynchronous geo-replication.
Step 1: Create cluster connection
Run the commands below to create the connection between these two Pulsar clusters.
Run the command below on Pulsar cluster pulsar-a
to create the connection from Pulsar cluster pulsar-a
to Pulsar cluster pulsar-b
. Then, when a client writes data to the Pulsar cluster pulsar-a
, the data is replicated from Pulsar cluster pulsar-a
to Pulsar cluster pulsar-b
.
kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
bin/pulsar-admin clusters create \
--broker-url pulsar://pulsar-b-sn-platform-broker.k8s-02:6650 \
--url http://pulsar-b-sn-platform-broker.k8s-02:8080 \
pulsar-b-sn-platform
Step 2: Create tenants and grant permissions
Create a tenant (rep-tenant-1
) in both Pulsar clusters, grant the Admin role to the tenant, and allow the tenant to access both Pulsar clusters.
./bin/pulsar-admin tenants create rep-tenant-1 --admin-roles admin --allowed-clusters pulsar-a-sn-platform,pulsar-b-sn-platform
Step 3: Create namespaces
Create a namespace (rep-tenant-1/rep-ns
) in both Pulsar clusters, grant the Admin role as well as the produce and consume permissions to the namespace, and set the replication clusters for the namespace.
./bin/pulsar-admin namespaces create rep-tenant-1/rep-ns
./bin/pulsar-admin namespaces grant-permission rep-tenant-1/rep-ns --actions produce,consume --role admin
./bin/pulsar-admin namespaces set-clusters rep-tenant-1/rep-ns --clusters pulsar-a-sn-platform,pulsar-b-sn-platform
Step 4: Create topics
Create a partitioned topic (rep-tenant-1/rep-ns/rep-topic
) in both Pulsar clusters.
./bin/pulsar-admin topics create-partitioned-topic -p 1 persistent://rep-tenant-1/rep-ns/rep-topic
Enable topic-level asynchronous geo-replication
This section describes how to enable namespace-level asynchronous geo-replication.
Step 1: Create cluster connection
Run the commands below to create the connection between these two Pulsar clusters.
Run the command below on Pulsar cluster pulsar-a
to create the connection from Pulsar cluster pulsar-a
to Pulsar cluster pulsar-b
. Then, when a client writes data to the Pulsar cluster pulsar-a
, the data is replicated from Pulsar cluster pulsar-a
to Pulsar cluster pulsar-b
.
kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01
bin/pulsar-admin clusters create \
--broker-url pulsar://pulsar-b-sn-platform-broker.k8s-02:6650 \
--url http://pulsar-b-sn-platform-broker.k8s-02:8080 \
pulsar-b-sn-platform
Step 2: Create tenants and grant permissions
Create a tenant (rep-tenant-1
) in both Pulsar clusters, grant the Admin role to the tenant, and allow the tenant to access both Pulsar clusters.
./bin/pulsar-admin tenants create rep-tenant-1 --admin-roles admin --allowed-clusters pulsar-a-sn-platform,pulsar-b-sn-platform
Step 3: Create namespaces
Create a namespace (rep-tenant-1/rep-ns
) in both Pulsar clusters.
./bin/pulsar-admin namespaces create rep-tenant-1/rep-ns
Step 4: Create topics
Create a partitioned topic (rep-tenant-1/rep-ns/rep-topic
) in both Pulsar clusters and set the replication clusters for the topic.
./bin/pulsar-admin topics create-partitioned-topic -p 1 persistent://rep-tenant-1/rep-ns/rep-topic
./bin/pulsar-admin topics set-replication-clusters --clusters pulsar-a-sn-platform,pulsar-b-sn-platform rep-tenant-1/rep-ns/rep-topic
Verify asynchronous geo-replication
This example shows that when you produce messages to Pulsar cluster pulsar-b
, the consumer can read it from Pulsar cluster pulsar-a
.
Consume the message from Pulsar cluster
pulsar-a
.kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01 bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
Produce a message (
hello-world
) to Pulsar clusterpulsar-b
.kubectl exec -it pulsar-b-sn-platform-toolset-0 /bin/bash -n k8s-02 bin/pulsar-client produce -m 'hello-world' rep-tenant-1/rep-ns/rep-topic
On the consumer terminal, the output should be similar to the following:
05:16:44.185 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xa8dc9ad8, L:/10.90.12.162:50018 - R:pulsar-a-sn-platform-broker.k8s-01.svc.cluster.local/172.20.234.167:6650]] Connected to server
05:16:44.298 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"sub","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"64b24","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
05:16:44.314 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://pulsar-a-sn-platform-broker:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
05:16:44.329 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xfa831f72, L:/10.90.12.162:60296 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650]] Connected to server
05:16:44.331 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribing to topic on cnx [id: 0xfa831f72, L:/10.90.12.162:60296 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650], consumerId 0
05:16:44.424 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribed to topic on pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.161:6650 -- consumer: 0
05:16:44.427 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [rep-tenant-1/rep-ns/rep-topic] [sub] Success subscribe new topic persistent://rep-tenant-1/rep-ns/rep-topic in topics consumer, partitions: 1, allTopicPartitionsNumber: 1
05:17:46.416 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:hello-world
This example shows that when you produce messages to Pulsar cluster pulsar-a
, the consumer can read it from Pulsar cluster pulsar-b
.
Consume the message from Pulsar cluster
pulsar-b
.kubectl exec -it pulsar-b-sn-platform-toolset-0 /bin/bash -n k8s-02 bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
Produce a message (
hello-world
) to Pulsar clusterpulsar-a
.kubectl exec -it pulsar-a-sn-platform-toolset-0 /bin/bash -n k8s-01 bin/pulsar-client produce -m 'hello-world' rep-tenant-1/rep-ns/rep-topic
On the consumer terminal, the output should be similar to the following:
root@pulsar-a-sn-platform-toolset-0:/pulsar# bin/pulsar-client consume -s sub -n 0 rep-tenant-1/rep-ns/rep-topic
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.pulsar.common.util.netty.DnsResolverUtil (file:/pulsar/lib/io.streamnative-pulsar-common-2.10.2.4.jar) to method sun.net.InetAddressCachePolicy.get()
WARNING: Please consider reporting this to the maintainers of org.apache.pulsar.common.util.netty.DnsResolverUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
11:25:05.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x7784f0c4, L:/10.90.10.180:56584 - R:pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server
11:25:05.303 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"sub","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"0de7c","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
11:25:05.319 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://pulsar-a-sn-platform-broker-headless:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
11:25:05.334 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650]] Connected to server
11:25:05.336 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribing to topic on cnx [id: 0x80d7d007, L:/10.90.10.180:56586 - R:pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650], consumerId 0
11:25:05.341 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://rep-tenant-1/rep-ns/rep-topic-partition-0][sub] Subscribed to topic on pulsar-a-sn-platform-broker-0.pulsar-a-sn-platform-broker-headless.k8s-01.svc.cluster.local/10.90.10.106:6650 -- consumer: 0
11:25:05.343 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [rep-tenant-1/rep-ns/rep-topic] [sub] Success subscribe new topic persistent://rep-tenant-1/rep-ns/rep-topic in topics consumer, partitions: 1, allTopicPartitionsNumber: 1
11:25:08.822 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:hello-world
Examples
This section provides some examples on how to enable asynchronous geo-replication when different authentication methods are enabled on StreamNative Platform.
Note
These instructions assume that you have installed StreamNative Platform in two Kubernetes namespaces (k8s-01
and k8s-02
) and that you work on two Pulsar clusters (pulsar-a
and pulsar-b
).
Enable JWT authentication and asynchronous geo-replication
If you want to enable JWT authentication and geo-replication for your Pulsar clusters, follow the instructions in this section.
Create JWT secrets for your Pulsar clusters
Before enabling JWT authentication on your Pulsar clusters, you need to create JWT secrets for your Pulsar cluster. You can run the prepare_helm_release.sh
script to automatically generate JWT secrets for one Pulsar cluster. Then, copy these secrets from one Kubernetes namespace to another and create new secrets for another Pulsar cluster based on the copied secrets.
Clone this repository and switch to the target directory.
git clone https://github.com/streamnative/charts.git cd charts
Run the
prepare_helm_release.sh
script to create JWT secrets../scripts/pulsar/prepare_helm_release.sh -n k8s-01 -k pulsar-a
-n,--namespace
: the Kubernetes namespace to install the helm chart-k,--release
: the helm release name
By default, the command generates the asymmetric public/private key pair. You can choose to generate a symmetric secret key by specifying
--symmetric
in the command.Check the created secrets.
kubectl get secret -n k8s-01 NAME TYPE DATA AGE pulsar-a-token-admin Opaque 2 26s pulsar-a-token-asymmetric-key Opaque 2 34s pulsar-a-token-broker-admin Opaque 2 29s pulsar-a-token-proxy-admin Opaque 2 31s pulsar-a-token-pulsar-manager-admin Opaque 2
pulsar-a-token-admin
: the role that is used to access the admin tools.pulsar-a-token-asymmetric-key
: a public/private key pair that is used to generate and validate tokens in an asymmetric algorithm. You can use the private key to generate tokens. You can use the public key to validate tokens.pulsar-a-token-broker-admin
: the role that is used for inter-broker communications.pulsar-a-token-proxy-admin
: the role that is used for Pulsar proxies to communicate to Pulsar brokers.pulsar-a-token-pulsar-manager-admin
: the superuser role that is used to access the StreamNative Console.
Now, these two Pulsar clusters share the same private key.
Configure JWT authentication for your Pulsar clusters
For details, see configure JWT authentication.
Enable asynchronous geo-replication
For details, see enable asynchronous geo-replication.
Enable OAuth2 authentication and asynchronous geo-replication
If you want to enable OAuth2 authentication and asynchronous geo-replication for your Pulsar clusters, follow these steps:
Configure OAuth2 authentication for your Pulsar clusters.
For details, see Configure OAuth2 authentication.
Enable asynchronous geo-replication.
For details, see enable asynchronous geo-replication.