Pulsar Spark Connector is an integration of Apache Pulsar and Apache Spark (data processing engine), which allows Spark to read data from Pulsar and write data to Pulsar using Spark structured streaming and Spark SQL and provides exactly-once source semantics and at-least-once sink semantics.
How it Works
This illustration shows how the Pulsar Spark source connector transfers data from a Pulsar topic to a Spark job.
As you can see from the above illustration, each Pulsar topic partition is mapped into a PulsarSourceRDD
partition. When a Spark job executes a new microbatch, the Pulsar Spark connector requests new data from Pulsar for each PulsarSourceRDD
partition. In the Spark cluster, the data request tasks are assigned to available executors. All of the available data for each partition since the last consumption are read from the Pulsar topic partition. And, the Spark job continuously creates new microbatches in a user-defined triggering interval and processes the fetched data accordingly. This process repeats until the Spark job is canceled. Once the data is fetched, you can do any operations, including shuffling.
After a microbatch, the offset is committed. Therefore, the next microbatch starts with newly incoming data since the last offset.
For failure recovery, the offsets of each partition are stored into the Spark’s checkpoint. And each time a Spark job is launched, it first tries to restore reading offsets from the state store. If there are offsets saved for this job, the Spark job reads data from the saved offsets. Otherwise, the Spark job reads data from a user-defined position of the topic.
The whole life cycle of a structured streaming job is illustrated in the figure below. As time goes by, each interval yields a new microbatch of data that is then processed by the Spark job.
Limitation
Currently, the Spark connector provides
at-least-once
semantics. Therefore, when the Spark connector writes streaming or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates could be to introduce a primary (unique) key that can be used to perform deduplication when reading data.
How to configure
You can set the following configurations for the Pulsar Spark connector.
Option | Value | Required | Default | QueryType | Description | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
service.url | The Pulsar serviceUrl configuration. | Yes | None | Streaming and batch queries | The broker service URL of your Pulsar cluster, such as "pulsar://localhost:6650". | ||||||||||
admin.url (Deprecated) | The Pulsar serviceHttpUrl configuration. | No | None | Streaming and batch queries | The HTTP service URL of your Pulsar cluster. | ||||||||||
predefinedSubscription | A subscription name string | No | None | Streaming and batch queries | The pre-defined subscription name that is used by the Spark connector to track the Spark application progress. | ||||||||||
subscriptionPrefix | A subscription prefix string | No | None | Streaming and batch queries | The prefix that is used by the Spark connector to generate a random subscription to track the Spark application progress. | ||||||||||
topic | A topic name string | Yes | None | Streaming and batch queries | The topic to be consumed. This option is exclusive to the topics and topicsPattern options. This option is available for the Pulsar Spark source. | ||||||||||
topics | A comma-separated list of topics | Yes | None | Streaming and batch queries | The list of topics to be consumed. This option is exclusive to the topic and topicsPattern options. This option is available for the Pulsar Spark source. | ||||||||||
topicsPattern | A Java regex string | Yes | None | Streaming and batch queries | The pattern used to subscribe to topic(s). This option is exclusive to the topic and topics options. This option is available for the Pulsar Spark source. | ||||||||||
poolTimeoutMs | A number string in unit of milliseconds | No | "120000" | Streaming and batch queries | The timeout for reading messages from Pulsar, such as 6000 . | ||||||||||
waitingForNonExistedTopic | Schema of Pulsar Spark source
In addition, each row in the Pulsar Spark source has the following metadata fields.
Example In Pulsar, a topic with AVRO schemas looks like the following:
When the data is exported to a Spark DataFrame/DataSet, it looks like the following:
This example shows how Pulsar data with
Security configurationsIf the Pulsar cluster requires authentication, you can set credentials in any of the following ways. Scenario 1When a Pulsar cluster is enabled with authentication and the Pulsar client and Pulsar Admin use the same credential, you can configure the Spark connector as below:
Scenario 2When a Pulsar cluster is enabled with TLS authentication and the Pulsar client and Pulsar Admin use different credentials, you can configure the Spark connector as below:
How to deployYou can use one of the following methods to use the Pulsar Spark connector and you need to configure it before using the connector.
Client libraryAs with any Spark applications, Example
CLIFor experimenting on Example
When locating an artifact or library, the
The format for the coordinates should be Tip For more information about submitting applications with external dependencies, see application submission guide. How to useThis section describes how to create Pulsar Spark source and sink connectors to transmit data between the Pulsar cluster and the Spark cluster. Read data from PulsarThe section describes how to create a Pulsar Spark source for streaming and batching queries. Prerequisites
Create a Pulsar Spark source for streaming queries
Tip For more information on how to use other language bindings for Spark structured streaming, see Structured Streaming Programming Guide. Create a Pulsar Spark source for batch queriesFor batch processing, you can create a Dataset/DataFrame for a defined range of offsets.
Write data to PulsarThe DataFrame written to Pulsar can have arbitrary schema. Since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups.
Prerequisites
Create a Pulsar Spark sink for streaming queries
Create a Pulsar Spark sink for batch queries
|