Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt

Use this file to discover all available pages before exploring further.

Schema evolution is enabled by default (tableEvolveSchemaEnabled=true). When a producer changes the schema of a topic, the lakehouse table is automatically updated to match.
Recommendation: For topics that have Lakehouse integration enabled, set the Pulsar schema compatibility strategy to BACKWARD_TRANSITIVE. This guarantees that every reader (including downstream Iceberg / Delta consumers) can read data written with all previous schema versions, which aligns with how the lakehouse table is evolved. See the Pulsar schema compatibility documentation for details on BACKWARD vs BACKWARD_TRANSITIVE. Apply with pulsar-admin at the namespace or topic level:
# Namespace level
bin/pulsar-admin namespaces set-schema-compatibility-strategy \
  --compatibility BACKWARD_TRANSITIVE \
  <tenant>/<namespace>

# Topic level
bin/pulsar-admin topicPolicies set-schema-compatibility-strategy \
  --strategy BACKWARD_TRANSITIVE \
  <tenant>/<namespace>/<topic-name>

How It Works

Schema evolution is triggered automatically during the compaction process when the compaction service encounters messages with a schema version newer than what the lakehouse table currently has.

Step 1: Detection

When processing messages, the compaction service extracts the schema version from each message. If the version is newer than the latest version recorded in the table’s schema mapping, evolution is triggered.

Step 2: Retrieve All Schema Versions

The compaction service retrieves all schema versions from the schema registry (Pulsar or Confluent) up to the current version. This ensures that intermediate schema versions are not skipped.

Step 3: Convert Schemas

Each source schema (Avro, JSON, ProtobufNative(Pulsar) or Protobuf(Kafka)) is converted to the target table format:
  • Avro is converted directly to Iceberg Schema or Delta StructType
  • JSON is first converted to Avro, then to the target format (Pulsar internally represents JSON schemas using Avro)
  • ProtobufNative(Pulsar) is converted via the Protobuf descriptor to Avro, then to the target format
  • Protobuf(Kafka) is converted via the Protobuf descriptor to Avro, then to the target format

Step 4: Apply Changes

Schema changes are applied to the table iteratively, version by version in ascending order. Each version is processed as a transaction. For Iceberg tables, the following operations are supported:
  • Add columns — new fields are always added as optional for backward compatibility
  • Delete columns — field is removed from the schema (or made optional in soft-delete mode)
  • Type promotionint to long, float to double, and other compatible promotions
  • Nullability changes — required fields can be made optional (reverse is not supported)
  • Nested struct evolution — the same rules apply recursively to nested fields
For Delta tables:
  • Column Mapping mode (NAME) is automatically enabled to support safe renames and deletes
  • Each column receives a unique physical ID for safe schema evolution
  • Schema changes are committed as Delta transactions

Step 5: Record Schema Mapping

After each successful evolution, a mapping from the source schema version to the table’s internal schema ID is recorded as a table property (streamnative.schema.mapping). This prevents re-processing the same version.

Supported Operations

OperationIcebergDelta
Add optional columnsSupportedSupported
Delete columnsSupportedSupported
Type promotion (int -> long, float -> double)SupportedSupported
Required -> OptionalSupportedSupported
Optional -> RequiredNot supportedNot supported
Nested struct evolutionSupportedSupported
Type category changes (struct <-> list, primitive <-> map)Not supportedNot supported

Limitations

  • Adding required fields: New fields added to existing tables are automatically converted to optional. Required fields can only be defined at table creation time.
  • Type category changes: Changing a field from one type category to another (e.g., struct to list, primitive to map) is not supported.
  • Backward incompatibility: If the table schema has been evolved beyond the message’s schema version, the message cannot be processed. Update the producer to use the latest schema.
  • Soft-delete mode: Enable schema.evolution.soft-delete.enabled=true to make deleted fields optional instead of removing them entirely. This preserves backward compatibility for readers.

Configuration

PropertyDescriptionDefault
tableEvolveSchemaEnabledEnable automatic schema evolutiontrue
schema.evolution.soft-delete.enabledMake deleted fields optional instead of removingfalse
dlt.suffixSuffix appended to Dead Letter Table name_dlt

Dead Letter Table (DLT)

When a message cannot be successfully processed into the lakehouse table, it is routed to a Dead Letter Table (DLT) instead of failing the entire compaction job. This allows the pipeline to continue processing valid messages while preserving failed records for investigation.

When Are Messages Routed to DLT?

Messages are routed to DLT in the following scenarios:
ScenarioExample
Schema incompatibility1) Schema evolution is disabled and the record schema does not match the table schema; 2) New topic schema is incompatible with the Lakehouse table schema
Unsupported schema changesEvolving a non-Variant field to Variant, or a Variant field to another type
Type conversion errorsRecord cannot be serialized to the target lakehouse format (Iceberg/Delta/Parquet)
Null valuesNull payload/record values from Kafka entries
Deserialization errorsFailures deserializing Pulsar or Kafka source data
Parsing failuresSchema parsing errors for Pulsar message format
Note: Fatal errors (such as catalog connectivity issues, permission errors, or infrastructure failures) are not routed to DLT — they cause the compaction task to fail and retry.

DLT Table Naming

The DLT table name is derived from the original topic name by appending the configurable DLT suffix (default: _dlt). Format:
<original-topic-local-name><dlt-suffix>
Example:
Original TopicDLT Table
persistent://my-tenant/my-namespace/user-eventsuser-events_dlt (in namespace my-tenant/my-namespace)
persistent://public/default/ordersorders_dlt (in namespace public/default)
For Iceberg tables, the DLT table is created in the same namespace as the original table. For Delta tables, the DLT topic is <tenant>/<namespace>/<topic-name>_dlt.

DLT Schema

Both Iceberg and Delta DLT tables use a consistent 3-field schema:
FieldTypeNullableDescription
messageIdStringNoThe original Pulsar or Kafka message ID
payloadStringYesBase64-encoded original message payload
failureReasonStringYesThe error message explaining why the record failed
This allows you to:
  • Identify which messages failed and why
  • Replay or reprocess the failed messages after fixing the underlying issue
  • Debug schema or serialization problems

Configuring the DLT Suffix

The default DLT suffix is _dlt. To customize it, set the dlt.suffix property in the compaction service configuration:
compactionScheduler:
  config:
    custom:
      dlt.suffix: "_deadletter"
With this configuration, the DLT table for topic user-events would be user-events-deadletter.

Monitoring DLT Activity

When the compaction writer closes, it logs a summary of DLT activity including:
  • Total number of records routed to DLT
  • Breakdown by failure reason (up to 100 distinct reasons tracked)
  • Up to 10 sample message IDs per failure reason
Example log output:
WARN Sent 42 record(s) to DLT for topic: persistent://public/default/events, partition: 0,
     failure reasons with messageIds: {"Schema evolution is disabled...": ["msgId1", "msgId2", ...]}
Use this information to diagnose schema mismatches, invalid data, or other data quality issues upstream.