Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] S3SingleDriverLogStore.listFrom takes a long time for large _delta_logs #1191

Closed
1 of 3 tasks
jkylling opened this issue Jun 10, 2022 · 8 comments
Closed
1 of 3 tasks
Assignees
Labels
bug Something isn't working
Milestone

Comments

@jkylling
Copy link
Contributor

Bug

Describe the problem

Reading from a Delta table with a large Delta log takes a long time using S3SingleDriverLogStore. When calculating a snapshot of a Delta table S3SingleDriverLogStore will list all files within the _delta_log key prefix. This can be expensive for Delta logs with many commits. This frequently happens when a streaming job is writing to a table.

To keep reads and writes fast the Delta Log Protocol creates checkpoints of the delta log, typically every 10th commit. To calculate a snapshot of a Delta table we look at the latest checkpoint, together with the commits which have happened since the latest checkpoint. This is done using the listFrom(path: Path) method of the LogStore interface. However, the existing implementation of listFrom in the S3SingleDriverLogStore will list all keys within the _delta_log prefix, and filter the result to only include lexicographic greater keys, see

.filter(s -> s.getPath().getName().compareTo(resolvedPath.getName()) >= 0)
. Listing the entire directory can be time consuming when the table contains many commits.

It is possible to avoid listing all keys under a given key prefix by using the startKey parameter of the S3 ListObjects V2 API. We have applied a patch with this fix in jkylling@ec998ee.

For the sample tables we tested the patch with this brought the read time down from 20-30 seconds to around 5 seconds with some occasional reads at 15 seconds. The reads at 15 seconds seemed to be related to processing of new checkpoints, as we were streaming to the tables at the same time as we were reading.

I have not tested this for the other LogStore implementations, but by looking at the code they seem to be affected by the same issue.

Steps to reproduce

  1. Create delta table
  2. Read from the delta table, note time
  3. Fill delta log with files (either junk or real commits)
  4. Read from delta table, note time

The below tests can be used for the above steps. See also this commit.

  val bucket = "s3a://your-s3-bucket"

  test("1. create table") {
    spark
      .sparkContext
      .hadoopConfiguration
      .set(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
      )

    spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_0'").collect()
    spark.sql(s"CREATE TABLE delta_log_performace_test1 (value LONG) USING DELTA LOCATION '${bucket}_1'").collect()
    spark.sql(s"CREATE TABLE delta_log_performace_test2 (value LONG) USING DELTA LOCATION '${bucket}_2'").collect()
  }

  test("2. fill bucket 2 _delta_log with garbage") {
    println(
      s"""
        |# In a shell run:
        |for i in {1..100000}; do touch .$$i.txt; done
        |aws s3 cp . ${bucket.replace("s3a://", "s3://")}_2/_delta_log/ --recursive
        |""".stripMargin)
  }

  test("3. time selects") {
      spark
        .sparkContext
        .hadoopConfiguration
        .set(
          "fs.s3a.aws.credentials.provider",
          "com.amazonaws.auth.profile.ProfileCredentialsProvider"
        )

    spark.sql(s"select * from delta.`${bucket}_0`") // warmup

    val start0 = System.currentTimeMillis()
    spark.sql(s"select * from delta.`${bucket}_1`")
    println(s"d1 = ${System.currentTimeMillis() - start0}")

    val start1 = System.currentTimeMillis()
    spark.sql(s"select * from delta.`${bucket}_2`")
    println(s"d2 = ${System.currentTimeMillis() - start1}")
    }

A sample run of the "time selects" tests gives

d1 = 1955
d2 = 12034

That is, reading identical tables takes 10 seconds longer when the delta log contains more files. The same happens if the delta log contains commits instead of junk files.

After applying the patch to S3SingleDriverLogStore we get

d1 = 1922
d2 = 1938 

Observed results

Reading from a Delta table with many files takes a really long time.

Expected results

Reading from a Delta table with a _delta_log with many files should take the same amount of time as reading from a _delta_log with few files. That is, the time to get a snapshot should not be proportional to the number of files in the _delta_log.

Further details

The issue can be partially mitigated by setting the delta.stalenessLimit option to a large value. However, I believe the write path would still be affected by this issue, as it seems to force an update of the snapshot before every write.

The same issue can be observed on a local file system by filling the _delta_log directory with junk (or several commits). This issue might affect all LogStores with naive implementations of listFrom.

Environment information

  • Delta Lake version: 1.3.0
  • Spark version: 3.2.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.

I would be happy to contribute to get this issue fixed.The suggested patch might need some work, as it digs quite deep in the AWS Hadoop API.

@jkylling jkylling added the bug Something isn't working label Jun 10, 2022
@scottsand-db
Copy link
Collaborator

Hi @jkylling, thanks for this issue? We will look into this and get back to you.

@scottsand-db scottsand-db self-assigned this Jun 13, 2022
@scottsand-db
Copy link
Collaborator

Hi @jkylling, this approach LGTM. Want to make an official PR?

@jkylling
Copy link
Contributor Author

Hi @scottsand-db . Thanks for looking into this. I'll open a PR.

jkylling added a commit to jkylling/delta that referenced this issue Jun 16, 2022
The current implementation of `S3SingleDriverLogStore.listFrom` lists the entire content of the parent directory and filters the result. This can take a long time if the parent directory contains a lot of files. In practice, this can happen for _delta_log folders with a lot of commits.

We change the implementation to use the [startAfter](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/ListObjectsV2Request.html#startAfter--) parameter such that we only get keys lexicographically greater or equal than the resolved path in the S3 list response. This will usually reduce the number of S3 list requests from `size of _delta_log / 1000` to 1.

This resolves #[1191](delta-io#1191).
scottsand-db pushed a commit that referenced this issue Nov 14, 2022
## Description

The current implementation of `S3SingleDriverLogStore.listFrom` lists the entire content of the parent directory and filters the result. This can take a long time if the parent directory contains a lot of files. In practice, this can happen for _delta_log folders with a lot of commits.

We change the implementation to use the [startAfter](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/ListObjectsV2Request.html#startAfter--) parameter such that we only get keys lexicographically greater or equal than the resolved path in the S3 list response. This will usually reduce the number of S3 list requests from `size of _delta_log / 1000` to 1.

This resolves #(#1191).

I've tested the patch briefly with the sample test described in #(#1191). The [previous iteration of this patch](jkylling@ec998ee) has been tested a bit more. Correctness has not been tested thoroughly.

## Does this PR introduce _any_ user-facing changes?

No

Closes #1210

Signed-off-by: Scott Sandre <scott.sandre@databricks.com>
GitOrigin-RevId: 2a0d1279655672cbdffd5604b7d7d781556888b9
@vkorukanti vkorukanti modified the milestones: 2.2.0, 2.3.0 Dec 5, 2022
@shenavaa
Copy link

shenavaa commented Dec 16, 2022

This seems to be enforcing s3a, breaks EMRFS or other Hadoop FileSystem implementations ?

@scottsand-db
Copy link
Collaborator

@shenavaa This only works for S3A file system, yup. If you are using EMRFS, then you shouldn't enable this feature.

@shenavaa
Copy link

shenavaa commented Dec 20, 2022

@scottsand-db No option to disable and it's already in 2.3.0 milestone!

@scottsand-db
Copy link
Collaborator

Closed by c156c98

@scottsand-db
Copy link
Collaborator

scottsand-db commented Dec 20, 2022

@shenavaa the feature is automatically disabled by default. You have to explicitly enable it using delta.enableFastS3AListFrom

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants