StreamNative Cloud enables you to manage Pulsar IO Connectors by using a variety of tools, including snctl
, pulsarctl
, pulsar-admin
, and Terraform
.
If you want to update or delete connectors using snctl
, pulsarctl
or pulsar-admin
, make sure you have set up your client tool. For more information, see set up client tools .
Update a connector
When you want to modify configurations or update resources for connectors, you can update connectors using multiple tools.
The following example shows how to update the parallelism of the data generator source connector test
to 2
using different tools.
snctl pulsarctl pulsar-admin Terraform Console Rest API snctl pulsar admin sources update \
--name test \
--parallelism 2
You should see the following output:
And you can further check the status:
snctl pulsar admin sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [
{
"instanceId" : 0,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1799,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 1799,
"lastReceivedTime" : 1693946327331,
"workerId" : "test"
}
},
{
"instanceId" : 1,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 689,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 689,
"lastReceivedTime" : 1693946327129,
"workerId" : "test"
}
}
]
}
snctl pulsar admin sources update \
--name test \
--parallelism 2
You should see the following output:
And you can further check the status:
snctl pulsar admin sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [
{
"instanceId" : 0,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1799,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 1799,
"lastReceivedTime" : 1693946327331,
"workerId" : "test"
}
},
{
"instanceId" : 1,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 689,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 689,
"lastReceivedTime" : 1693946327129,
"workerId" : "test"
}
}
]
}
pulsarctl sources update \
--name test \
--parallelism 2
You should see the following output:
And you can further check the status:
pulsarctl sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [
{
"instanceId" : 0,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1799,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 1799,
"lastReceivedTime" : 1693946327331,
"workerId" : "test"
}
},
{
"instanceId" : 1,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 689,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 689,
"lastReceivedTime" : 1693946327129,
"workerId" : "test"
}
}
]
}
pulsar-admin \
--admin-url "${ WEB_SERVICE_URL }" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file:///YOUR-KEY-FILE-PATH",
"issuerUrl":"https://auth.streamnative.cloud/",
"audience":"urn:sn:pulsar:${orgName}:${instanceName}}'
sources update \
--name test \
--parallelism 2
Replace the placeholder variables with the actual values, which you can get when you set up client tools.
admin-url
: the HTTP service URL of your Pulsar cluster.
private_key
: the path to the downloaded OAuth2 key file.
audience
: the Uniform Resource Name (URN) , which is a combination of the urn:sn:pulsar
, your organization name, and your Pulsar instance name.
You should see the following output:
And you can further check the status:
pulsar-admin sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1287,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 1287,
"lastReceivedTime" : 1693946605095,
"workerId" : "test"
}
}, {
"instanceId" : 1,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 909,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 909,
"lastReceivedTime" : 1693946605023,
"workerId" : "test"
}
} ]
}
To update the submitted connector, you only need to update the Terraform file and then call the following command.
# update the main.tf file
terraform apply
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
~ update in-place
Terraform will perform the following actions:
# pulsar_source.test will be updated in-place
~ resource "pulsar_source" "test" {
~ archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" - > "builtin://data-generator"
- custom_runtime_options = jsonencode (
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) - > null
id = "public/default/dg-test-tf"
name = "dg-test-tf"
~ parallelism = 1 - > 2
# (10 unchanged attributes hidden)
}
Plan: 0 to add, 1 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value: yes
pulsar_source.test: Modifying... [id=public/default/dg-test-tf]
pulsar_source.test: Modifications complete after 1s [id=public/default/dg-test-tf]
Apply complete! Resources: 0 added, 1 changed, 0 destroyed.
On the left navigation pane of StreamNative Console, under Resources , click Connectors .
On the Connectors page, select the Created Sources tab.
Click the ellipsis at the end of the row of the connector, and then click Edit .
Edit the configuration that you want to change, and click SUBMIT .
curl -X PUT https:// ${ WEB_SERVICE_URL } /admin/v3/sources/{tenant}/{namespace}/test \
-H 'Authorization: Bearer <API Key>' \
-H "Content-Type: multipart/form-data" \
-F 'sourceConfig={"name": "test", "tenant": "public", "namespace": "default", "parallelism": 2};type=application/json'
If no error is response, the update is successful.
And you can further check the status:
curl -X GET https:// ${ WEB_SERVICE_URL } /admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [
{
"instanceId" : 0,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1799,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 1799,
"lastReceivedTime" : 1693946327331,
"workerId" : "test"
}
},
{
"instanceId" : 1,
"status" : {
"running" : true ,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 689,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [],
"numWritten" : 689,
"lastReceivedTime" : 1693946327129,
"workerId" : "test"
}
}
]
}
Delete a connector
The following example shows how to delete the data generator source connector test
using different tools.
snctl pulsarctl pulsar-admin Terraform Console Rest API To delete the source connector test
, use the following command.
snctl pulsar admin sources delete --tenant public --namespace default --name test
You should see the following output:
Deleted test successfully
If you want to verify whether the source connector has been deleted successfully, run the following command.
snctl pulsar admin sources get --tenant public --namespace default --name test
You should see the following output:
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector test
, use the following command.
snctl pulsar admin sources delete --tenant public --namespace default --name test
You should see the following output:
Deleted test successfully
If you want to verify whether the source connector has been deleted successfully, run the following command.
snctl pulsar admin sources get --tenant public --namespace default --name test
You should see the following output:
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector test
, use the following command.
pulsarctl sources delete --tenant public --namespace default --name test
You should see the following output:
Deleted test successfully
If you want to verify whether the source connector has been deleted successfully, run the following command.
pulsarctl sources get --tenant public --namespace default --name test
You should see the following output:
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector test
, run the following command.
./bin/pulsar-admin sources delete --tenant public --namespace default --name test
You should see the following output:
Delete source successfully
To verify the source connector has been deleted, run the following command.
./bin/pulsar-admin sources get --tenant public --namespace default --name test
You should see the following output:
failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-e9ef0ca6" not found
To delete the source connector test
with terraform, run the following command and type yes
on the prompt.
terraform destroy
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
- destroy
Terraform will perform the following actions:
# pulsar_source.test will be destroyed
- resource "pulsar_source" "test" {
- archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" - > null
- classname = "org.apache.pulsar.io.datagenerator.DataGeneratorSource" - > null
- configs = jsonencode (
{
- sleepBetweenMessages = "60"
}
) - > null
- cpu = 1 - > null
- custom_runtime_options = jsonencode (
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) - > null
- destination_topic_name = "public/default/dg-test" - > null
- disk_mb = 10240 - > null
- id = "public/default/dg-test-tf" - > null
- name = "dg-test-tf" - > null
- namespace = "default" - > null
- parallelism = 2 - > null
- processing_guarantees = "ATMOST_ONCE" - > null
- ram_mb = 1024 - > null
- tenant = "public" - > null
- use_thread_local_producers = false - > null
}
Plan: 0 to add, 0 to change, 1 to destroy.
Do you really want to destroy all resources?
Terraform will destroy all your managed infrastructure, as shown above.
There is no undo. Only 'yes' will be accepted to confirm.
Enter a value: yes
pulsar_source.test: Destroying... [id=public/default/dg-test-tf]
pulsar_source.test: Destruction complete after 1s
Destroy complete! Resources: 1 destroyed.
On the left navigation pane of StreamNative Console, under Resources , click Connectors .
On the Connectors page, select the Created Sources tab.
Click the ellipsis at the end of the row of the connector, and then click Delete .
Enter the connector name and then click Confirm .
To delete the source connector test
, use the following command.
curl -X DELETE https:// ${ WEB_SERVICE_URL } /admin/v3/sources/public/default/test \
--header 'Authorization: Bearer <API Key>'
If no error is response, the delete is successful.
If you want to verify whether the source connector has been deleted successfully, run the following command.
curl -X GET https:// ${ WEB_SERVICE_URL } /admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
You should see the following output:
{ "reason" : "failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io \" test-6b51d8ef \" not found" }%