Skip to content

Commit

Permalink
[SPARK-49115][DOCS] Add docs for state metadata source for operators …
Browse files Browse the repository at this point in the history
…using schema format v2 such as transformWithState

### What changes were proposed in this pull request?
Add docs for state metadata source for operators using schema format v2 such as transformWithState

### Why are the changes needed?
Adding documentation for state metadata source and its use with newer operators such as `transformWithState`

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47625 from anishshri-db/task/SPARK-49115.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Aug 7, 2024
1 parent 717a6da commit 435a01a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
35 changes: 32 additions & 3 deletions docs/structured-streaming-state-data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Users are encouraged to query about the schema via df.schema() / df.printSchema(
The following options must be set for the source.

<table>
<thead><tr><th>Option</th><th>value</th><th>meaning</th></tr></thead>
<thead><tr><th>Option</th><th>Value</th><th>Meaning</th></tr></thead>
<tr>
<td>path</td>
<td>string</td>
Expand All @@ -119,7 +119,7 @@ The following options must be set for the source.
The following configurations are optional:

<table>
<thead><tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr></thead>
<thead><tr><th>Option</th><th>Value</th><th>Default</th><th>Meaning</th></tr></thead>
<tr>
<td>batchId</td>
<td>numeric value</td>
Expand Down Expand Up @@ -264,13 +264,14 @@ The output schema will also be different from the normal output.
</tr>
</table>

## State metadata source
## State Metadata Source

Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.

Structured Streaming provides a data source named "State metadata source" to provide the state-related metadata information from the checkpoint.

Note: The metadata is constructed when the streaming query is running with Spark 4.0+. The existing checkpoint which has been running with lower Spark version does not have the metadata and will be unable to query/use with this metadata source. It is required to run the streaming query pointing the existing checkpoint in Spark 4.0+ to construct the metadata before querying.
Users can optionally provide the batchId to get the operator metadata at a point in time.

### Creating a State metadata store for Batch Queries

Expand Down Expand Up @@ -310,6 +311,29 @@ Dataset<Row> df = spark

</div>

The following options must be set for the source:

<table>
<thead><tr><th>Option</th><th>Value</th><th>Meaning</th></tr></thead>
<tr>
<td>path</td>
<td>string</td>
<td>Specify the root directory of the checkpoint location. You can either specify the path via option("path", `path`) or load(`path`).</td>
</tr>
</table>

The following configurations are optional:

<table>
<thead><tr><th>Option</th><th>Value</th><th>Default</th><th>Meaning</th></tr></thead>
<tr>
<td>batchId</td>
<td>numeric value</td>
<td>Last committed batch if available, else 0</td>
<td>Optional batchId used to retrieve operator metadata at that batch.</td>
</tr>
</table>

Each row in the source has the following schema:

<table>
Expand Down Expand Up @@ -344,6 +368,11 @@ Each row in the source has the following schema:
<td>int</td>
<td>The maximum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as the query will commit further batches.</td>
</tr>
<tr>
<td>operatorProperties</td>
<td>string</td>
<td>List of properties used by the operator encoded as JSON. Output generated here is operator dependent.</td>
</tr>
<tr>
<td>_numColsPrefixKey</td>
<td>int</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class StateMetadataTable extends Table with SupportsRead with SupportsMetadataCo

val checkpointLocation = options.get("path")

// TODO: SPARK-49115 - add docs for new options for state metadata source
val batchIdOpt = Option(options.get("batchId")).map(_.toLong)
// if a batchId is provided, use it. Otherwise, use the last committed batch. If there is no
// committed batch, use batchId 0.
Expand Down

0 comments on commit 435a01a

Please sign in to comment.