Skip to main content
The Debezium Cloud Spanner Source connector streams transactional change events from a Google Cloud Spanner database into Kafka topics.
This connector is available as a built-in connector on StreamNative Cloud.

Prerequisites

  • Google Cloud project with a provisioned Spanner instance and database
  • Service account key with spanner.databaseReader and monitoring.viewer roles
  • Enable the Cloud Spanner change streams API in the target project

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file similar to the following:
    {
        "name": "debezium-spanner-source",
        "config": {
            "connector.class": "io.debezium.connector.spanner.SpannerConnector",
            "tasks.max": "1",
            "gcp.spanner.change.stream": "changeStreamAll",
            "gcp.spanner.project.id": "${GCP_PROJECT}",
            "gcp.spanner.instance.id": "${SPANNER_INSTANCE}",
            "gcp.spanner.database.id": "${SPANNER_DATABASE}",
            "gcp.spanner.credentials.json": "${SERVICE_ACCOUNT_JSON}",
            "sn.passthrough.bootstrapServer.fields": "connector.spanner.sync.kafka.bootstrap.servers",
            "sn.passthrough.kafka.client.field.prefixes": "kafka.internal.client"
        }
    }
    
    sn.passthrough.bootstrapServer.fields and sn.passthrough.kafka.client.field.prefixes are required to make StreamNative Cloud automatically configure the internal Kafka client settings for the connector.
    You shouldn’t change these properties unless you want to override the default behavior.
  3. Deploy the connector:
    kcctl create -f <filename>.json
    

Configuration

The Debezium Cloud Spanner Source connector accepts the following common options:
PropertyRequiredDefaultDescription
nametrueNo defaultUnique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.
connector.classtrueNo defaultThe name of the Java class for the connector. Always use a value of io.debezium.connector.spanner.SpannerConnector for the Spanner connector.
tasks.maxtrue1The maximum number of tasks that should be created for this connector. The Spanner connector can use more than 1 tasks if you enable offset.storage.per.task mode.
gcp.spanner.project.idtrueNo defaultThe GCP project ID
gcp.spanner.instance.idtrueNo defaultThe Spanner instance ID
gcp.spanner.database.idtrueNo defaultThe Spanner database ID
gcp.spanner.change.streamtrueNo defaultThe Spanner change stream
gcp.spanner.credentials.pathtrueNo defaultThe file path to the GCP service account key JSON.
gcp.spanner.credentials.jsontrueNo defaultThe GCP service account key JSON. Required if gcp.spanner.credentials.path is not provided.
schema.name.adjustment.modetruenoneSpecifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:


none does not apply any adjustment.


avro replaces the characters that cannot be used in the Avro type name with underscore.


avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java
field.name.adjustment.modetruenoneSpecifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings:


none does not apply any adjustment.


avro replaces the characters that cannot be used in the Avro type name with underscore.


avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java


See Avro naming for more details.
gcp.spanner.low-watermark.enabledfalsefalseWhether or not the low watermark is enabled for the connector.
gcp.spanner.low-watermark.update-period.msfalse1000 msThe interval at which the low watermark is updated.
heartbeat.interval.msfalse300000The Spanner heartbeat interval.
gcp.spanner.start.timefalsecurrent timeThe connector start time.
gcp.spanner.end.timefalseindefinite end timeThe connector end time.
gcp.spanner.stream.event.queue.capacityfalse10000The Spanner event queue capacity. Increase this capacity if the remaining stream event queue capacity approaches zero during connector runtime.
connector.spanner.task.state.change.event.queue.capacityfalse1000The task state change event queue capacity. Increase this capacity if the remaining task state change event queue capacity approaches zero during connector runtime.
connector.spanner.max.missed.heartbeatsfalse5The maximum number of missed heartbeats for a change stream query before an exception is thrown
scaler.monitor.enabledfalsefalseWhether or not task autoscaling is enabled
connector.spanner.sync.topicfalsesync_topic_spanner_connector$connectornameThe name for the Sync topic. The Sync topic is an internal connector topic used to store communication between tasks.
connector.spanner.sync.poll.durationfalse500 msThe poll duration for the sync topic.
connector.spanner.sync.request.timeout.msfalse5000 msThe timeout for requests to the sync topic.
connector.spanner.sync.delivery.timeout.msfalse15000 msThe timeout for publishing to the sync topic.
connector.spanner.sync.commit.offsets.timeout.msfalse5000 msThe timeout for committing offsets for the sync topic.
connector.spanner.sync.commit.offsets.interval.msfalse60000 msThe interval at which offsets are committed for the sync topic.
connector.spanner.sync.publisher.wait.timeoutfalse5 msThe interval at which messages are published to the sync topic.
connector.spanner.rebalancing.topicfalserebalancing_topic_spanner_connector$connectornameThe name for the rebalancing topic. The rebalancing topic is an internal connector topic used to determine task aliveness.
connector.spanner.rebalancing.poll.durationfalse5000The poll duration for the rebalancing topic.
connector.spanner.rebalancing.commit.offsets.timeoutfalse5000The timeout for committing offsets for the rebalance topic.
connector.spanner.rebalancing.commit.offsets.interval.msfalse60000 msThe interval at which offsets are committed for the sync topic.
connector.spanner.rebalancing.task.waiting.timeoutfalse1000 msThe duration of time a task waits before processing a rebalancing event.
custom.metric.tagsfalseNo defaultDefines tags that customize MBean object names by adding metadata that provides contextual information. Specify a comma-separated list of key-value pairs. Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example,
k1=v1,k2=v2

The connector appends the specified tags to the base MBean object name. Tags can help you to organize and categorize metrics data. You can define tags to identify particular application instances, environments, regions, versions, and so forth. For more information, see Customized MBean names.
errors.max.retriesfalse-1Specifies how the connector responds after an operation that results in a retriable error, such as a connection error.
Set one of the following options:

-1

No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures.

0

Disabled. The connector fails immediately, and never retries the operation. User intervention is required to restart the connector.

> 0

The connector restarts automatically until it reaches the specified maximum number of retries. After the next failure, the connector stops, and user intervention is required to restart it.
extended.headers.enabledfalsetrueThis property specifies whether Debezium adds context headers with the prefix __debezium.context. to the messages that it emits.

These headers are required by the OpenLineage integration and provide metadata that enables downstream processing systems to track and identify the sources of change events.

The property adds following headers:

__debezium.context.connectorLogicalName

The logical name of the Debezium connector.

__debezium.context.taskId

The unique identifier of the connector task.

__debezium.context.connectorName

The name of the Debezium connector.
Refer to the official Debezium Cloud Spanner documentation for a complete property reference and advanced deployment guidance.