> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Manage Kafka Connectors

StreamNative Cloud enables you to manage Pulsar IO Connectors by using a variety of tools, including `snctl`, `kcctl`, `restful-api`, and `Console`.

## Update a connect

When you want to modify configurations or update resources for connects, you can update connectors using multiple tools.

The following example shows how to update the tasks.max of the data generator source connector `test` to `2` using different tools.

<Tabs>
  <Tab title="snctl">
    ```bash theme={null}
    > cat datagen.json
    {
      "name": "test",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "testusers",
        "quickstart": "users",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "max.interval": 1000,
        "iterations": 10000000,
        "tasks.max": "2"
      }
    }
    > snctl kafka admin connect apply -f datagen.json
    ```

    You should see the following output:

    ```bash theme={null}
    Updated connector test
    ```

    And you can further check the status:

    ```bash theme={null}
    snctl kafka admin connect describe connector test

    Name:       test
    Type:       source
    State:      RUNNING
    Worker ID:  10.80.161.255:8083
    Config:
      connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
      iterations:                      10000000
      kafka.topic:                     testusers
      key.converter:                   org.apache.kafka.connect.storage.StringConverter
      max.interval:                    1000
      name:                            test
      quickstart:                      users
      tasks.max:                       2
      value.converter:                 org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable:  false
    Tasks:
      0:
        State:                             RUNNING
        Worker ID:                         10.80.161.255:8083
        Config:
          connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
          quickstart:                      users
          tasks.max:                       2
          max.interval:                    1000
          iterations:                      10000000
          task.class:                      io.confluent.kafka.connect.datagen.DatagenTask
          name:                            test
          value.converter.schemas.enable:  false
          kafka.topic:                     testusers
          task.id:                         0
          value.converter:                 org.apache.kafka.connect.json.JsonConverter
          key.converter:                   org.apache.kafka.connect.storage.StringConverter
      1:
        State:                             RUNNING
        Worker ID:                         10.80.160.255:8083
        Config:
          connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
          quickstart:                      users
          tasks.max:                       2
          max.interval:                    1000
          iterations:                      10000000
          task.class:                      io.confluent.kafka.connect.datagen.DatagenTask
          name:                            test
          value.converter.schemas.enable:  false
          kafka.topic:                     testusers
          task.id:                         0
          value.converter:                 org.apache.kafka.connect.json.JsonConverter
          key.converter:                   org.apache.kafka.connect.storage.StringConverter
    Topics:
      testusers
    ```
  </Tab>

  <Tab title="kcctl">
    ```bash theme={null}
    > cat datagen.json
    {
      "name": "test",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "testusers",
        "quickstart": "users",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "max.interval": 1000,
        "iterations": 10000000,
        "tasks.max": "2"
      }
    }
    > kcctl apply -f datagen.json
    ```

    You should see the following output:

    ```bash theme={null}
    Updated connector test
    ```

    And you can further check the status:

    ```bash theme={null}
    kcctl describe connectors test --tasks-config

    Name:       test
    Type:       source
    State:      RUNNING
    Worker ID:  10.80.161.255:8083
    Config:
      connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
      iterations:                      10000000
      kafka.topic:                     testusers
      key.converter:                   org.apache.kafka.connect.storage.StringConverter
      max.interval:                    1000
      name:                            test
      quickstart:                      users
      tasks.max:                       2
      value.converter:                 org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable:  false
    Tasks:
      0:
        State:                             RUNNING
        Worker ID:                         10.80.161.255:8083
        Config:
          connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
          quickstart:                      users
          tasks.max:                       2
          max.interval:                    1000
          iterations:                      10000000
          task.class:                      io.confluent.kafka.connect.datagen.DatagenTask
          name:                            test
          value.converter.schemas.enable:  false
          kafka.topic:                     testusers
          task.id:                         0
          value.converter:                 org.apache.kafka.connect.json.JsonConverter
          key.converter:                   org.apache.kafka.connect.storage.StringConverter
      1:
        State:                             RUNNING
        Worker ID:                         10.80.160.255:8083
        Config:
          connector.class:                 io.confluent.kafka.connect.datagen.DatagenConnector
          quickstart:                      users
          tasks.max:                       2
          max.interval:                    1000
          iterations:                      10000000
          task.class:                      io.confluent.kafka.connect.datagen.DatagenTask
          name:                            test
          value.converter.schemas.enable:  false
          kafka.topic:                     testusers
          task.id:                         0
          value.converter:                 org.apache.kafka.connect.json.JsonConverter
          key.converter:                   org.apache.kafka.connect.storage.StringConverter
    Topics:
      testusers
    ```
  </Tab>

  <Tab title="Rest API">
    ```bash theme={null}
    > cat datagen.json
    {
      "name": "test",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "testusers",
        "quickstart": "users",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "max.interval": 1000,
        "iterations": 10000000,
        "tasks.max": "2"
      }
    }
    > curl -X PUT --header "Content-Type: application/json" "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/config" --data @datagen.json
    ```

    If no error is response, the update is successful.

    And you can further check the status:

    ```bash theme={null}
    > curl "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test | jq '.'"
    {
      "name": "datagen-test-offset",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "iterations": "10000000",
        "kafka.topic": "testusers",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "max.interval": "1000",
        "name": "datagen-test-offset",
        "quickstart": "users",
        "tasks.max": "2",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
      },
      "tasks": [
        {
          "connector": "datagen-test-offset",
          "task": 0
        },
        {
          "connector": "datagen-test-offset",
          "task": 1
        }
      ],
      "type": "source"
    }
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Edit**.

    4. Edit the configuration that you want to change, and click **SUBMIT**.
  </Tab>
</Tabs>

## Delete a connector

The following example shows how to delete the data generator source connector `test` using different tools.

<Tabs>
  <Tab title="snctl">
    To delete the source connector `test`, use the following command.

    ```bash theme={null}
    snctl kafka admin connect delete connector test
    ```

    You should see the following output:

    ```bash theme={null}
    Deleted connector test
    ```

    If you want to verify whether the source connector has been deleted successfully, run the following command.

    ```bash theme={null}
    snctl kafka admin connect get connectors
    ```

    You should see the following output:

    ```bash theme={null}

     NAME                  TYPE     STATE        TASKS

    ```
  </Tab>

  <Tab title="kcctl">
    To delete the source connector `test`, use the following command.

    ```bash theme={null}
    kcctl delete connector test
    ```

    You should see the following output:

    ```bash theme={null}
    Deleted connector test
    ```

    If you want to verify whether the source connector has been deleted successfully, run the following command.

    ```bash theme={null}
    kcctl get connectors
    ```

    You should see the following output:

    ```bash theme={null}

     NAME                  TYPE     STATE        TASKS

    ```
  </Tab>

  <Tab title="Rest API">
    To delete the source connector `test`, use the following command.

    ```bash theme={null}
    > curl -X DELETE "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test"
    ```

    If no error is response, the delete is successful.

    If you want to verify whether the source connector has been deleted successfully, run the following command.

    ```bash theme={null}
    curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test"
    ```

    You should see the following output:

    ```bash theme={null}
    {"reason":"This resource doesn't exist, please check the name"}
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Delete**.

    4. Enter the connector name and then click **Confirm**.
  </Tab>
</Tabs>

## Stop a connector

You can stop a running connector using different tools, after stopped, all resources of the connector will be released.

After stopped, you can restart the connector.

<Tabs>
  <Tab title="snctl">
    To stop the source connector `test`, use the following command.

    ```bash theme={null}
    snctl kafka admin connect stop test
    ```

    You should see the following output:

    ```bash theme={null}
    Stopped connector test
    ```

    If you want to verify whether the source connector has been stopped successfully, run the following command.

    ```bash theme={null}
    snctl kafka admin connect get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   STOPPED
    ```
  </Tab>

  <Tab title="kcctl">
    To stop the source connector `test`, use the following command.

    ```bash theme={null}
    kcctl stop connectors test
    ```

    You should see the following output:

    ```bash theme={null}
    Stopped connector test
    ```

    If you want to verify whether the source connector has been stopped successfully, run the following command.

    ```bash theme={null}
    kcctl get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   STOPPED
    ```
  </Tab>

  <Tab title="Rest API">
    To stop the source connector `test`, use the following command.

    ```bash theme={null}
    > curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/stop"
    ```

    If no error is response, the stop is successful.

    If you want to verify whether the source connector has been stopped successfully, run the following command.

    ```bash theme={null}
    curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
    ```

    You should see the following output:

    ```bash theme={null}
    {
      "name": "test",
      "connector": {
        "state": "STOPPED",
        "worker_id": "10.80.161.255:8083",
        "trace": "Kafka Connector Resource stopped"
      },
      "tasks": [],
      "type": "source"
    }
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Stop**.
  </Tab>
</Tabs>

## Restart a connector

You can restart a stopped or a running connector using different tools.

<Tabs>
  <Tab title="snctl">
    To restart the source connector `test`, use the following command.

    ```bash theme={null}
    snctl kafka admin connect restart connector test
    ```

    You should see the following output:

    ```bash theme={null}
    Restarted connector test
    ```

    If you want to verify whether the source connector has been restarted successfully, run the following command.

    ```bash theme={null}
    snctl kafka admin connect get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   RUNNING
    ```
  </Tab>

  <Tab title="kcctl">
    To restart the source connector `test`, use the following command.

    ```bash theme={null}
    kcctl restart connectors test
    ```

    You should see the following output:

    ```bash theme={null}
    Restarted connector test
    ```

    If you want to verify whether the source connector has been restarted successfully, run the following command.

    ```bash theme={null}
    kcctl get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   RUNNING
    ```
  </Tab>

  <Tab title="Rest API">
    To restart the source connector `test`, use the following command.

    ```bash theme={null}
    > curl -X POST "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/restart"
    ```

    If no error is response, the restart is successful.

    If you want to verify whether the source connector has been restarted successfully, run the following command.

    ```bash theme={null}
    curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
    ```

    You should see the following output:

    ```bash theme={null}
    {
      "name": "test",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.80.161.255:8083",
        "trace": "",
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.244.0.75:8083"
        }
      ],
      "type": "source"
    }
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Restart**.
  </Tab>
</Tabs>

## Pause a connector

You can pause a running connector using different tools, a paused connector still occupy the resources but just stop processing messages.

<Tabs>
  <Tab title="snctl">
    To pause the source connector `test`, use the following command.

    ```bash theme={null}
    snctl kafka admin connect pause test
    ```

    You should see the following output:

    ```bash theme={null}
    Paused connector test
    ```

    If you want to verify whether the source connector has been paused successfully, run the following command.

    ```bash theme={null}
    snctl kafka admin connect get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   PAUSED    0: PAUSED
    ```
  </Tab>

  <Tab title="kcctl">
    To pause the source connector `test`, use the following command.

    ```bash theme={null}
    kcctl pause connectors test
    ```

    You should see the following output:

    ```bash theme={null}
    Paused connector test
    ```

    If you want to verify whether the source connector has been paused successfully, run the following command.

    ```bash theme={null}
    kcctl get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   PAUSED    0: PAUSED
    ```
  </Tab>

  <Tab title="Rest API">
    To pause the source connector `test`, use the following command.

    ```bash theme={null}
    > curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/pause"
    ```

    If no error is response, the pause is successful.

    If you want to verify whether the source connector has been paused successfully, run the following command.

    ```bash theme={null}
    curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
    ```

    You should see the following output:

    ```bash theme={null}
    {
      "name": "test",
      "connector": {
        "state": "PAUSED",
        "worker_id": "10.80.161.255:8083",
        "trace": "",
      },
      "tasks": [
        {
          "id": 0,
          "state": "PAUSEd",
          "worker_id": "10.244.0.75:8083"
        }
      ],
      "type": "source"
    }
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Pause**.
  </Tab>
</Tabs>

## Resume a connector

You can resume a paused connector using different tools.

<Tabs>
  <Tab title="snctl">
    To resume the source connector `test`, use the following command.

    ```bash theme={null}
    snctl kafka admin connect resume test
    ```

    You should see the following output:

    ```bash theme={null}
    Resumed connector test
    ```

    If you want to verify whether the source connector has been resumed successfully, run the following command.

    ```bash theme={null}
    snctl kafka admin connect get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   RUNNING   0: RUNNING
    ```
  </Tab>

  <Tab title="kcctl">
    To resume the source connector `test`, use the following command.

    ```bash theme={null}
    kcctl resume connectors test
    ```

    You should see the following output:

    ```bash theme={null}
    Resumed connector test
    ```

    If you want to verify whether the source connector has been resumed successfully, run the following command.

    ```bash theme={null}
    kcctl get connectors
    ```

    You should see the following output:

    ```bash theme={null}
     NAME      TYPE     STATE     TASKS
     datagen   source   RUNNING   0: RUNNING
    ```
  </Tab>

  <Tab title="Rest API">
    To resume the source connector `test`, use the following command.

    ```bash theme={null}
    > curl -X PUT --header 'content-type: application/json' "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/resume"
    ```

    If no error is response, the pause is successful.

    If you want to verify whether the source connector has been resumed successfully, run the following command.

    ```bash theme={null}
    curl -X GET "https://public%2Fdefault:${APIKEY}@${KAFKA-SERVICE-URL}/admin/kafkaconnect/connectors/test/status | jq '.'"
    ```

    You should see the following output:

    ```bash theme={null}
    {
      "name": "test",
      "connector": {
        "state": "RESUMED",
        "worker_id": "10.80.161.255:8083",
        "trace": "",
      },
      "tasks": [
        {
          "id": 0,
          "state": "RESUMED",
          "worker_id": "10.244.0.75:8083"
        }
      ],
      "type": "source"
    }
    ```
  </Tab>

  <Tab title="Console">
    1. On the left navigation pane of StreamNative Console, under **Resources**, click **Connectors**.

    2. On the Connectors page, select the **Kafka Sources** or **Kafka Sinks** tab.

    3. Click the ellipsis at the end of the row of the connector, and then click **Resume**.
  </Tab>
</Tabs>

## What’s next?

* [Monitor and troubleshoot kafka connectors](/cloud/connect/kafka-connect/kafka-connect-monitoring)

* [SMTs(Single Message Transformations)](/cloud/connect/kafka-connect/kafka-connect-smt)

* Discover kafka Connect Ecosystem on [StreamNative Hub](/connect/overview).
