Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to commit transaction: 15 when writing an Iterator of recordbatches #2708

Open
echai58 opened this issue Jul 26, 2024 · 3 comments
Open
Labels
bug Something isn't working

Comments

@echai58
Copy link

echai58 commented Jul 26, 2024

Environment

Delta-rs version: 0.17.3

Binding: python


Bug

What happened:
I'm seeing Failed to commit transaction: 15 error when trying to write Iterator of recordbatches concurrently using append. When i switch it to be a pyarrow Table instead of an iterator, the error goes away - this is reproducible.

Looking through the write_deltalake code it seems to convert everything to a RecordBatchReader, so I'm not sure where this is coming from.

What you expected to happen:
Be able to concurrently append iterators of recordbatches.

How to reproduce it:

from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa

SCHEMA = pa.schema(
    [
        ("a", pa.int64()),
        ("b", pa.int64()),
        ("c", pa.int64()),
    ]
)


def random_table():
    random_data = pd.DataFrame.from_dict(
        {
            "a": [random.randint(1, 100) for _ in range(500)],
            "b": [random.randint(1, 100) for _ in range(500)],
            "c": [random.randint(1, 100) for _ in range(500)],
        }
    )
    time.sleep(random.random())
    yield pa.Table.from_pandas(random_data, schema=SCHEMA).to_batches()[0]
    # return pa.Table.from_pandas(random_data, schema=SCHEMA)


def write_table(table):
    write_deltalake(table, random_table(), schema=SCHEMA, mode="append")


def main():
    write_deltalake(
        f"parallel_table",
        pa.RecordBatchReader.from_batches(SCHEMA,random_table()),
        mode="overwrite",
        schema=SCHEMA,
    )

    tables = [
        DeltaTable(
            f"parallel_table",
        )
        for _ in range(50)
    ]

    count = 0
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(write_table, table) for table in tables]

        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                count += 1
                print(f"{count} An error occurred: {e}")


if __name__ == "__main__":
    main()

If you comment in/out the line between yielding a recordbatch and returning a pyarrow table, you will see the two scenarios.

@echai58 echai58 added the bug Something isn't working label Jul 26, 2024
@ion-elgreco
Copy link
Collaborator

A recordbatchreader can only be consumed once

@echai58
Copy link
Author

echai58 commented Jul 27, 2024

A recordbatchreader can only be consumed once

Yes, I'm not sure how that's relevant though?

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jul 27, 2024

I'm not sure why you're seeing this behaviour, either way both end up in a reader before being written, perhaps one is faster therefore more write contention

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants