snctl, pulsarctl, pulsar-admin, and Terraform.
If you want to update or delete connectors using
snctl, pulsarctl or pulsar-admin, make sure you have set up your client tool. For more information, see set up client tools.Update a connector
When you want to modify configurations or update resources for connectors, you can update connectors using multiple tools. The following example shows how to update the parallelism of the data generator source connectortest to 2 using different tools.
- snctl
- pulsarctl
- pulsar-admin
- Terraform
- Console
- Rest API
Copy
Ask AI
snctl pulsar admin sources update \
--name test \
--parallelism 2
Copy
Ask AI
Updated successfully
Copy
Ask AI
snctl pulsar admin sources status --name test
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
Copy
Ask AI
pulsarctl sources update \
--name test \
--parallelism 2
Copy
Ask AI
Updated successfully
Copy
Ask AI
pulsarctl sources status --name test
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
Copy
Ask AI
pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file:///YOUR-KEY-FILE-PATH",
"issuerUrl":"https://auth.streamnative.cloud/",
"audience":"urn:sn:pulsar:${orgName}:${instanceName}}'
sources update \
--name test \
--parallelism 2
admin-url: the HTTP service URL of your Pulsar cluster.private_key: the path to the downloaded OAuth2 key file.audience: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar, your organization name, and your Pulsar instance name.${orgName}: the name of your organization.${instanceName}: the name of your instance.
Copy
Ask AI
Updated successfully
Copy
Ask AI
pulsar-admin sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1287,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 1287,
"lastReceivedTime" : 1693946605095,
"workerId" : "test"
}
}, {
"instanceId" : 1,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 909,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 909,
"lastReceivedTime" : 1693946605023,
"workerId" : "test"
}
} ]
}
To update the submitted connector, you only need to update the Terraform file and then call the following command.
Copy
Ask AI
# update the main.tf file
terraform apply
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
~ update in-place
Terraform will perform the following actions:
# pulsar_source.test will be updated in-place
~ resource "pulsar_source" "test" {
~ archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" -> "builtin://data-generator"
- custom_runtime_options = jsonencode(
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) -> null
id = "public/default/dg-test-tf"
name = "dg-test-tf"
~ parallelism = 1 -> 2
# (10 unchanged attributes hidden)
}
Plan: 0 to add, 1 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value: yes
pulsar_source.test: Modifying... [id=public/default/dg-test-tf]
pulsar_source.test: Modifications complete after 1s [id=public/default/dg-test-tf]
Apply complete! Resources: 0 added, 1 changed, 0 destroyed.
- On the left navigation pane of StreamNative Console, under Resources, click Connectors.
- On the Connectors page, select the Created Sources tab.
- Click the ellipsis at the end of the row of the connector, and then click Edit.
- Edit the configuration that you want to change, and click SUBMIT.
Copy
Ask AI
curl -X PUT https://${WEB_SERVICE_URL}/admin/v3/sources/{tenant}/{namespace}/test \
-H 'Authorization: Bearer <API Key>' \
-H "Content-Type: multipart/form-data" \
-F 'sourceConfig={"name": "test", "tenant": "public", "namespace": "default", "parallelism": 2};type=application/json'
Copy
Ask AI
curl -X GET https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
Delete a connector
The following example shows how to delete the data generator source connectortest using different tools.
- snctl
- pulsarctl
- pulsar-admin
- Terraform
- Console
- Rest API
To delete the source connector You should see the following output:If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.Copy
Ask AI
snctl pulsar admin sources delete --tenant public --namespace default --name test
Copy
Ask AI
Deleted test successfully
Copy
Ask AI
snctl pulsar admin sources get --tenant public --namespace default --name test
Copy
Ask AI
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector You should see the following output:If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.Copy
Ask AI
pulsarctl sources delete --tenant public --namespace default --name test
Copy
Ask AI
Deleted test successfully
Copy
Ask AI
pulsarctl sources get --tenant public --namespace default --name test
Copy
Ask AI
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector You should see the following output:To verify the source connector has been deleted, run the following command.You should see the following output:
test, run the following command.Copy
Ask AI
./bin/pulsar-admin sources delete --tenant public --namespace default --name test
Copy
Ask AI
Delete source successfully
Copy
Ask AI
./bin/pulsar-admin sources get --tenant public --namespace default --name test
Copy
Ask AI
failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-e9ef0ca6" not found
To delete the source connector
test with terraform, run the following command and type yes on the prompt.Copy
Ask AI
terraform destroy
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
- destroy
Terraform will perform the following actions:
# pulsar_source.test will be destroyed
- resource "pulsar_source" "test" {
- archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" -> null
- classname = "org.apache.pulsar.io.datagenerator.DataGeneratorSource" -> null
- configs = jsonencode(
{
- sleepBetweenMessages = "60"
}
) -> null
- cpu = 1 -> null
- custom_runtime_options = jsonencode(
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) -> null
- destination_topic_name = "public/default/dg-test" -> null
- disk_mb = 10240 -> null
- id = "public/default/dg-test-tf" -> null
- name = "dg-test-tf" -> null
- namespace = "default" -> null
- parallelism = 2 -> null
- processing_guarantees = "ATMOST_ONCE" -> null
- ram_mb = 1024 -> null
- tenant = "public" -> null
- use_thread_local_producers = false -> null
}
Plan: 0 to add, 0 to change, 1 to destroy.
Do you really want to destroy all resources?
Terraform will destroy all your managed infrastructure, as shown above.
There is no undo. Only 'yes' will be accepted to confirm.
Enter a value: yes
pulsar_source.test: Destroying... [id=public/default/dg-test-tf]
pulsar_source.test: Destruction complete after 1s
Destroy complete! Resources: 1 destroyed.
- On the left navigation pane of StreamNative Console, under Resources, click Connectors.
- On the Connectors page, select the Created Sources tab.
- Click the ellipsis at the end of the row of the connector, and then click Delete.
- Enter the connector name and then click Confirm.
To delete the source connector If no error is response, the delete is successful.If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.Copy
Ask AI
curl -X DELETE https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test \
--header 'Authorization: Bearer <API Key>'
Copy
Ask AI
curl -X GET https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
Copy
Ask AI
{"reason":"failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io \"test-6b51d8ef\" not found"}%