pyspark.sql.streaming.DataStreamWriter.foreach¶
-
DataStreamWriter.
foreach
(f: Union[Callable[[pyspark.sql.types.Row], None], SupportsProcess]) → DataStreamWriter[source]¶ Sets the output of the streaming query to be processed using the provided writer
f
. This is often used to write the output of a streaming query to arbitrary storage systems. The processing logic can be specified in two ways.- A function that takes a row as input.
This is a simple way to express your processing logic. Note that this does not allow you to deduplicate generated data when failures cause reprocessing of some input data. That would require you to specify the processing logic in the next way.
- An object with a
process
method and optionalopen
andclose
methods. The object can have the following methods.
open(partition_id, epoch_id)
: Optional method that initializes the processing(for example, open a connection, start a transaction, etc). Additionally, you can use the partition_id and epoch_id to deduplicate regenerated data (discussed later).
process(row)
: Non-optional method that processes eachRow
.close(error)
: Optional method that finalizes and cleans up (for example,close connection, commit transaction, etc.) after all rows have been processed.
The object will be used by Spark in the following way.
- A single copy of this object is responsible of all the data generated by a
single task in a query. In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
- This object must be serializable because each task will get a fresh
serialized-deserialized copy of the provided object. Hence, it is strongly recommended that any initialization for writing data (e.g. opening a connection or starting a transaction) is done after the open(…) method has been called, which signifies that the task is ready to generate data.
The lifecycle of the methods are as follows.
For each partition with
partition_id
:… For each batch/epoch of streaming data with
epoch_id
:……. Method
open(partitionId, epochId)
is called.- ……. If
open(...)
returns true, for each row in the partition and batch/epoch, method
process(row)
is called.- ……. Method
close(errorOrNull)
is called with error (if any) seen while processing rows.
- ……. If
Important points to note:
- The partitionId and epochId can be used to deduplicate generated data when
failures cause reprocessing of some input data. This depends on the execution mode of the query. If the streaming query is being executed in the micro-batch mode, then every partition represented by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit data and achieve exactly-once guarantees. However, if the streaming query is being executed in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
- The
close()
method (if exists) will be called if open() method exists and returns successfully (irrespective of the return value), except if the Python crashes in the middle.
- The
- An object with a
New in version 2.4.0.
Changed in version 3.5.0: Supports Spark Connect.
Notes
This API is evolving.
Examples
>>> import time >>> df = spark.readStream.format("rate").load()
Print every row using a function
>>> def print_row(row): ... print(row) ... >>> q = df.writeStream.foreach(print_row).start() >>> time.sleep(3) >>> q.stop()
Print every row using a object with process() method
>>> class RowPrinter: ... def open(self, partition_id, epoch_id): ... print("Opened %d, %d" % (partition_id, epoch_id)) ... return True ... ... def process(self, row): ... print(row) ... ... def close(self, error): ... print("Closed with error: %s" % str(error)) ... >>> q = df.writeStream.foreach(print_row).start() >>> time.sleep(3) >>> q.stop()