StreamNative Cloud enables you to manage Pulsar IO Connectors by using a variety of tools, including snctl
, kcctl
, restful-api
, and Console
.
Update a connect
When you want to modify configurations or update resources for connects, you can update connectors using multiple tools.
The following example shows how to update the tasks.max of the data generator source connector test
to 2
using different tools.
> cat datagen.json
{
"name": "test",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "testusers",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "2"
}
}
> snctl kafka admin connect apply -f datagen.json
You should see the following output:
And you can further check the status:
snctl kafka admin connect describe connector test
Name: test
Type: source
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
iterations: 10000000
kafka.topic: testusers
key.converter: org.apache.kafka.connect.storage.StringConverter
max.interval: 1000
name: test
quickstart: users
tasks.max: 2
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
Tasks:
0:
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
1:
State: RUNNING
Worker ID: 10.80.160.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
Topics:
testusers
> cat datagen.json
{
"name": "test",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "testusers",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "2"
}
}
> snctl kafka admin connect apply -f datagen.json
You should see the following output:
And you can further check the status:
snctl kafka admin connect describe connector test
Name: test
Type: source
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
iterations: 10000000
kafka.topic: testusers
key.converter: org.apache.kafka.connect.storage.StringConverter
max.interval: 1000
name: test
quickstart: users
tasks.max: 2
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
Tasks:
0:
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
1:
State: RUNNING
Worker ID: 10.80.160.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
Topics:
testusers
> cat datagen.json
{
"name": "test",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "testusers",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "2"
}
}
> kcctl apply -f datagen.json
You should see the following output:
And you can further check the status:
kcctl describe connectors test --tasks-config
Name: test
Type: source
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
iterations: 10000000
kafka.topic: testusers
key.converter: org.apache.kafka.connect.storage.StringConverter
max.interval: 1000
name: test
quickstart: users
tasks.max: 2
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
Tasks:
0:
State: RUNNING
Worker ID: 10.80.161.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
1:
State: RUNNING
Worker ID: 10.80.160.255:8083
Config:
connector.class: io.confluent.kafka.connect.datagen.DatagenConnector
quickstart: users
tasks.max: 2
max.interval: 1000
iterations: 10000000
task.class: io.confluent.kafka.connect.datagen.DatagenTask
name: test
value.converter.schemas.enable: false
kafka.topic: testusers
task.id: 0
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter: org.apache.kafka.connect.storage.StringConverter
Topics:
testusers
> cat datagen.json
{
"name": "test",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "testusers",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "2"
}
}
> curl -X PUT --header "Content-Type: application/json" "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/config" --data @datagen.json
If no error is response, the update is successful.
And you can further check the status:
> curl "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test | jq '.'"
{
"name": "datagen-test-offset",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"iterations": "10000000",
"kafka.topic": "testusers",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.interval": "1000",
"name": "datagen-test-offset",
"quickstart": "users",
"tasks.max": "2",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
},
"tasks": [
{
"connector": "datagen-test-offset",
"task": 0
},
{
"connector": "datagen-test-offset",
"task": 1
}
],
"type": "source"
}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Edit.
-
Edit the configuration that you want to change, and click SUBMIT.
Delete a connector
The following example shows how to delete the data generator source connector test
using different tools.
To delete the source connector test
, use the following command.
snctl kafka admin connect delete connector test
You should see the following output:
If you want to verify whether the source connector has been deleted successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
To delete the source connector test
, use the following command.
snctl kafka admin connect delete connector test
You should see the following output:
If you want to verify whether the source connector has been deleted successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
To delete the source connector test
, use the following command.
kcctl delete connector test
You should see the following output:
If you want to verify whether the source connector has been deleted successfully, run the following command.
You should see the following output:
To delete the source connector test
, use the following command.
> curl -X DELETE "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test"
If no error is response, the delete is successful.
If you want to verify whether the source connector has been deleted successfully, run the following command.
curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test"
You should see the following output:
{"reason":"This resource doesn't exist, please check the name"}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Delete.
-
Enter the connector name and then click Confirm.
Stop a connector
You can stop a running connector using different tools, after stopped, all resources of the connector will be released.
After stopped, you can restart the connector.
To stop the source connector test
, use the following command.
snctl kafka admin connect stop test
You should see the following output:
If you want to verify whether the source connector has been stopped successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source STOPPED
To stop the source connector test
, use the following command.
snctl kafka admin connect stop test
You should see the following output:
If you want to verify whether the source connector has been stopped successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source STOPPED
To stop the source connector test
, use the following command.
kcctl stop connectors test
You should see the following output:
If you want to verify whether the source connector has been stopped successfully, run the following command.
You should see the following output:
NAME TYPE STATE TASKS
datagen source STOPPED
To stop the source connector test
, use the following command.
> curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/stop"
If no error is response, the stop is successful.
If you want to verify whether the source connector has been stopped successfully, run the following command.
curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
You should see the following output:
{
"name": "test",
"connector": {
"state": "STOPPED",
"worker_id": "10.80.161.255:8083",
"trace": "Kafka Connector Resource stopped"
},
"tasks": [],
"type": "source"
}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Stop.
Restart a connector
You can restart a stopped or a running connector using different tools.
To restart the source connector test
, use the following command.
snctl kafka admin connect restart connector test
You should see the following output:
If you want to verify whether the source connector has been restarted successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING
To restart the source connector test
, use the following command.
snctl kafka admin connect restart connector test
You should see the following output:
If you want to verify whether the source connector has been restarted successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING
To restart the source connector test
, use the following command.
kcctl restart connectors test
You should see the following output:
If you want to verify whether the source connector has been restarted successfully, run the following command.
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING
To restart the source connector test
, use the following command.
> curl -X POST "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/restart"
If no error is response, the restart is successful.
If you want to verify whether the source connector has been restarted successfully, run the following command.
curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
You should see the following output:
{
"name": "test",
"connector": {
"state": "RUNNING",
"worker_id": "10.80.161.255:8083",
"trace": "",
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.244.0.75:8083"
}
],
"type": "source"
}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Restart.
Pause a connector
You can pause a running connector using different tools, a paused connector still occupy the resources but just stop processing messages.
To pause the source connector test
, use the following command.
snctl kafka admin connect pause test
You should see the following output:
If you want to verify whether the source connector has been paused successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source PAUSED 0: PAUSED
To pause the source connector test
, use the following command.
snctl kafka admin connect pause test
You should see the following output:
If you want to verify whether the source connector has been paused successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source PAUSED 0: PAUSED
To pause the source connector test
, use the following command.
kcctl pause connectors test
You should see the following output:
If you want to verify whether the source connector has been paused successfully, run the following command.
You should see the following output:
NAME TYPE STATE TASKS
datagen source PAUSED 0: PAUSED
To pause the source connector test
, use the following command.
> curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/pause"
If no error is response, the pause is successful.
If you want to verify whether the source connector has been paused successfully, run the following command.
curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
You should see the following output:
{
"name": "test",
"connector": {
"state": "PAUSED",
"worker_id": "10.80.161.255:8083",
"trace": "",
},
"tasks": [
{
"id": 0,
"state": "PAUSEd",
"worker_id": "10.244.0.75:8083"
}
],
"type": "source"
}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Pause.
Resume a connector
You can resume a paused connector using different tools.
To resume the source connector test
, use the following command.
snctl kafka admin connect resume test
You should see the following output:
If you want to verify whether the source connector has been resumed successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING 0: RUNNING
To resume the source connector test
, use the following command.
snctl kafka admin connect resume test
You should see the following output:
If you want to verify whether the source connector has been resumed successfully, run the following command.
snctl kafka admin connect get connectors
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING 0: RUNNING
To resume the source connector test
, use the following command.
kcctl resume connectors test
You should see the following output:
If you want to verify whether the source connector has been resumed successfully, run the following command.
You should see the following output:
NAME TYPE STATE TASKS
datagen source RUNNING 0: RUNNING
To resume the source connector test
, use the following command.
> curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/resume"
If no error is response, the pause is successful.
If you want to verify whether the source connector has been resumed successfully, run the following command.
curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
You should see the following output:
{
"name": "test",
"connector": {
"state": "RESUMED",
"worker_id": "10.80.161.255:8083",
"trace": "",
},
"tasks": [
{
"id": 0,
"state": "RESUMED",
"worker_id": "10.244.0.75:8083"
}
],
"type": "source"
}
-
On the left navigation pane of StreamNative Console, under Resources, click Connectors.
-
On the Connectors page, select the Kafka Sources or Kafka Sinks tab.
-
Click the ellipsis at the end of the row of the connector, and then click Resume.
What’s next?