Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
Schema evolution is enabled by default (tableEvolveSchemaEnabled=true). When a producer changes the schema of a topic, the lakehouse table is automatically updated to match.
Recommendation: For topics that have Lakehouse integration enabled, set the Pulsar schema compatibility strategy to BACKWARD_TRANSITIVE. This guarantees that every reader (including downstream Iceberg / Delta consumers) can read data written with all previous schema versions, which aligns with how the lakehouse table is evolved. See the Pulsar schema compatibility documentation for details on BACKWARD vs BACKWARD_TRANSITIVE.
Apply with pulsar-admin at the namespace or topic level:
# Namespace level
bin/pulsar-admin namespaces set-schema-compatibility-strategy \
--compatibility BACKWARD_TRANSITIVE \
<tenant>/<namespace>
# Topic level
bin/pulsar-admin topicPolicies set-schema-compatibility-strategy \
--strategy BACKWARD_TRANSITIVE \
<tenant>/<namespace>/<topic-name>
How It Works
Schema evolution is triggered automatically during the compaction process when the compaction service encounters messages with a schema version newer than what the lakehouse table currently has.
Step 1: Detection
When processing messages, the compaction service extracts the schema version from each message. If the version is newer than the latest version recorded in the table’s schema mapping, evolution is triggered.
Step 2: Retrieve All Schema Versions
The compaction service retrieves all schema versions from the schema registry (Pulsar or Confluent) up to the current version. This ensures that intermediate schema versions are not skipped.
Step 3: Convert Schemas
Each source schema (Avro, JSON, ProtobufNative(Pulsar) or Protobuf(Kafka)) is converted to the target table format:
- Avro is converted directly to Iceberg Schema or Delta StructType
- JSON is first converted to Avro, then to the target format (Pulsar internally represents JSON schemas using Avro)
- ProtobufNative(Pulsar) is converted via the Protobuf descriptor to Avro, then to the target format
- Protobuf(Kafka) is converted via the Protobuf descriptor to Avro, then to the target format
Step 4: Apply Changes
Schema changes are applied to the table iteratively, version by version in ascending order. Each version is processed as a transaction.
For Iceberg tables, the following operations are supported:
- Add columns — new fields are always added as optional for backward compatibility
- Delete columns — field is removed from the schema (or made optional in soft-delete mode)
- Type promotion —
int to long, float to double, and other compatible promotions
- Nullability changes — required fields can be made optional (reverse is not supported)
- Nested struct evolution — the same rules apply recursively to nested fields
For Delta tables:
- Column Mapping mode (
NAME) is automatically enabled to support safe renames and deletes
- Each column receives a unique physical ID for safe schema evolution
- Schema changes are committed as Delta transactions
Step 5: Record Schema Mapping
After each successful evolution, a mapping from the source schema version to the table’s internal schema ID is recorded as a table property (streamnative.schema.mapping). This prevents re-processing the same version.
Supported Operations
| Operation | Iceberg | Delta |
|---|
| Add optional columns | Supported | Supported |
| Delete columns | Supported | Supported |
Type promotion (int -> long, float -> double) | Supported | Supported |
| Required -> Optional | Supported | Supported |
| Optional -> Required | Not supported | Not supported |
| Nested struct evolution | Supported | Supported |
Type category changes (struct <-> list, primitive <-> map) | Not supported | Not supported |
Limitations
- Adding required fields: New fields added to existing tables are automatically converted to optional. Required fields can only be defined at table creation time.
- Type category changes: Changing a field from one type category to another (e.g., struct to list, primitive to map) is not supported.
- Backward incompatibility: If the table schema has been evolved beyond the message’s schema version, the message cannot be processed. Update the producer to use the latest schema.
- Soft-delete mode: Enable
schema.evolution.soft-delete.enabled=true to make deleted fields optional instead of removing them entirely. This preserves backward compatibility for readers.
Configuration
| Property | Description | Default |
|---|
tableEvolveSchemaEnabled | Enable automatic schema evolution | true |
schema.evolution.soft-delete.enabled | Make deleted fields optional instead of removing | false |
dlt.suffix | Suffix appended to Dead Letter Table name | _dlt |
Dead Letter Table (DLT)
When a message cannot be successfully processed into the lakehouse table, it is routed to a Dead Letter Table (DLT) instead of failing the entire compaction job. This allows the pipeline to continue processing valid messages while preserving failed records for investigation.
When Are Messages Routed to DLT?
Messages are routed to DLT in the following scenarios:
| Scenario | Example |
|---|
| Schema incompatibility | 1) Schema evolution is disabled and the record schema does not match the table schema; 2) New topic schema is incompatible with the Lakehouse table schema |
| Unsupported schema changes | Evolving a non-Variant field to Variant, or a Variant field to another type |
| Type conversion errors | Record cannot be serialized to the target lakehouse format (Iceberg/Delta/Parquet) |
| Null values | Null payload/record values from Kafka entries |
| Deserialization errors | Failures deserializing Pulsar or Kafka source data |
| Parsing failures | Schema parsing errors for Pulsar message format |
Note: Fatal errors (such as catalog connectivity issues, permission errors, or infrastructure failures) are not routed to DLT — they cause the compaction task to fail and retry.
DLT Table Naming
The DLT table name is derived from the original topic name by appending the configurable DLT suffix (default: _dlt).
Format:
<original-topic-local-name><dlt-suffix>
Example:
| Original Topic | DLT Table |
|---|
persistent://my-tenant/my-namespace/user-events | user-events_dlt (in namespace my-tenant/my-namespace) |
persistent://public/default/orders | orders_dlt (in namespace public/default) |
For Iceberg tables, the DLT table is created in the same namespace as the original table. For Delta tables, the DLT topic is <tenant>/<namespace>/<topic-name>_dlt.
DLT Schema
Both Iceberg and Delta DLT tables use a consistent 3-field schema:
| Field | Type | Nullable | Description |
|---|
messageId | String | No | The original Pulsar or Kafka message ID |
payload | String | Yes | Base64-encoded original message payload |
failureReason | String | Yes | The error message explaining why the record failed |
This allows you to:
- Identify which messages failed and why
- Replay or reprocess the failed messages after fixing the underlying issue
- Debug schema or serialization problems
Configuring the DLT Suffix
The default DLT suffix is _dlt. To customize it, set the dlt.suffix property in the compaction service configuration:
compactionScheduler:
config:
custom:
dlt.suffix: "_deadletter"
With this configuration, the DLT table for topic user-events would be user-events-deadletter.
Monitoring DLT Activity
When the compaction writer closes, it logs a summary of DLT activity including:
- Total number of records routed to DLT
- Breakdown by failure reason (up to 100 distinct reasons tracked)
- Up to 10 sample message IDs per failure reason
Example log output:
WARN Sent 42 record(s) to DLT for topic: persistent://public/default/events, partition: 0,
failure reasons with messageIds: {"Schema evolution is disabled...": ["msgId1", "msgId2", ...]}
Use this information to diagnose schema mismatches, invalid data, or other data quality issues upstream.