You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
What happened:
I'm seeing Failed to commit transaction: 15 error when trying to write Iterator of recordbatches concurrently using append. When i switch it to be a pyarrow Table instead of an iterator, the error goes away - this is reproducible.
Looking through the write_deltalake code it seems to convert everything to a RecordBatchReader, so I'm not sure where this is coming from.
What you expected to happen:
Be able to concurrently append iterators of recordbatches.
How to reproduce it:
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa
SCHEMA = pa.schema(
[
("a", pa.int64()),
("b", pa.int64()),
("c", pa.int64()),
]
)
def random_table():
random_data = pd.DataFrame.from_dict(
{
"a": [random.randint(1, 100) for _ in range(500)],
"b": [random.randint(1, 100) for _ in range(500)],
"c": [random.randint(1, 100) for _ in range(500)],
}
)
time.sleep(random.random())
yield pa.Table.from_pandas(random_data, schema=SCHEMA).to_batches()[0]
# return pa.Table.from_pandas(random_data, schema=SCHEMA)
def write_table(table):
write_deltalake(table, random_table(), schema=SCHEMA, mode="append")
def main():
write_deltalake(
f"parallel_table",
pa.RecordBatchReader.from_batches(SCHEMA,random_table()),
mode="overwrite",
schema=SCHEMA,
)
tables = [
DeltaTable(
f"parallel_table",
)
for _ in range(50)
]
count = 0
with ThreadPoolExecutor() as executor:
futures = [executor.submit(write_table, table) for table in tables]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
count += 1
print(f"{count} An error occurred: {e}")
if __name__ == "__main__":
main()
If you comment in/out the line between yielding a recordbatch and returning a pyarrow table, you will see the two scenarios.
The text was updated successfully, but these errors were encountered:
I'm not sure why you're seeing this behaviour, either way both end up in a reader before being written, perhaps one is faster therefore more write contention
Environment
Delta-rs version: 0.17.3
Binding: python
Bug
What happened:
I'm seeing
Failed to commit transaction: 15
error when trying to write Iterator of recordbatches concurrently using append. When i switch it to be a pyarrow Table instead of an iterator, the error goes away - this is reproducible.Looking through the
write_deltalake
code it seems to convert everything to aRecordBatchReader
, so I'm not sure where this is coming from.What you expected to happen:
Be able to concurrently append iterators of recordbatches.
How to reproduce it:
If you comment in/out the line between yielding a recordbatch and returning a pyarrow table, you will see the two scenarios.
The text was updated successfully, but these errors were encountered: