Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt

Use this file to discover all available pages before exploring further.

Upsert mode enables deduplication of records based on a primary key. When multiple records with the same key arrive, only the latest value is retained in the lakehouse table. upsert.mode.enabled and identifier.fields are dynamic configuration keys.
Cluster-name prefix: All dynamic configuration keys must be prefixed with the cluster name (for example, <cluster-name>.upsert.mode.enabled). The cluster name is the value of clusterName in conf/broker.conf — see Finding the Cluster Name. The examples below use private-cloud as the cluster name; replace it with the name of your cluster.

Configuration Keys

KeyScopeDescriptionDefault
<cluster-name>.upsert.mode.enabledCluster / Namespace / TopicEnable upsert mode (true / false). More-specific scopes override broader ones.false
<cluster-name>.identifier.fieldsTopic onlyComma-separated list of fields used as the primary key. Setting this at the cluster or namespace level has no effect.

How Upsert Works

StreamNative Ursa implements upsert using Iceberg equal-delete files. For each upsert write, the compaction service emits an equal-delete entry that matches the previous record by the configured identifier fields, followed by the new record. Readers reconcile the delete and the new value at query time.

Catalog Compatibility

Because upsert depends on equal-delete files, the underlying catalog and query engine must support reading Iceberg equal deletes. Catalogs differ in their delete-file support:
CatalogEqual DeleteUpsert Support
Snowflake Open Catalog (Polaris)No (position delete only)Not supported
Snowflake Horizon CatalogNo (position delete only)Not supported
Databricks Unity Catalog (Managed Iceberg)No (position delete only)Not supported
AWS S3TableYesSupported
Google BigLakeUnder confirmationUnder confirmation
Iceberg Hadoop catalog (no external catalog)YesSupported
If you need upsert with a catalog that only supports position deletes, the recommended approach is to use the Hadoop catalog or AWS S3Table for the affected tables.

Apply at Cluster Level

Cluster-level properties are stored in the sn/system namespace. Cluster-level upsert applies as the default for every namespace and topic that does not override it.
bin/pulsar-admin namespaces set-properties \
  -p private-cloud.cluster.upsert.mode.enabled=true \
  sn/system

Apply at Namespace Level

bin/pulsar-admin namespaces set-properties \
  -p private-cloud.upsert.mode.enabled=true \
  <tenant>/<namespace>

Apply at Topic Level

identifier.fields must be set on the topic; setting upsert.mode.enabled on the topic is also valid and overrides any namespace or cluster default.
bin/pulsar-admin topics update-properties \
  -p private-cloud.upsert.mode.enabled=true \
  -p private-cloud.identifier.fields=<field1>,<field2> \
  persistent://<tenant>/<namespace>/<topic>

Example

Enable upsert on persistent://public/default/users with userId and email as the primary key:
bin/pulsar-admin topics update-properties \
  -p private-cloud.upsert.mode.enabled=true \
  -p private-cloud.identifier.fields=userId,email \
  persistent://public/default/users

Requirements

  • Identifier fields must exist in the topic schema.
  • Identifier fields must be marked as required in the schema (not nullable).
  • The catalog and query engine must support reading Iceberg equal-delete files. See Catalog Compatibility.

Commit Behavior

  • Append-only writes (default) are batch-committed: multiple Parquet files are grouped and committed to the catalog in a single commit.
  • Upsert writes are committed one-by-one to ensure correct data ordering. This may slightly reduce throughput compared to append-only mode.

Limitations

  • When upsert is combined with a partition key, identifier fields only deduplicate within the same partition. For example, if the table has partition.key=region and identifier.fields=userId, two records with the same userId but different region values are both kept.
  • Upsert is supported only for External Tables (SDT).