From 2f2c6a9fd9e23c3af679622ee4bfb2a0527b3321 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 3 Aug 2024 16:14:13 +0200 Subject: [PATCH] Python application On branch initial-implementation Changes to be committed: new file: .github/workflows/CI.yml modified: .gitignore modified: pyproject.toml new file: python/falsa/__init__.py new file: python/falsa/app.py new file: python/falsa/local_fs.py modified: src/lib.rs --- .github/workflows/CI.yml | 168 +++++++++++++++++++ .gitignore | 4 + pyproject.toml | 3 + python/falsa/__init__.py | 8 + python/falsa/app.py | 353 +++++++++++++++++++++++++++++++++++++++ python/falsa/local_fs.py | 166 ++++++++++++++++++ src/lib.rs | 6 +- 7 files changed, 705 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/CI.yml create mode 100644 python/falsa/__init__.py create mode 100644 python/falsa/app.py create mode 100644 python/falsa/local_fs.py diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml new file mode 100644 index 0000000..9f0095a --- /dev/null +++ b/.github/workflows/CI.yml @@ -0,0 +1,168 @@ +# This file is autogenerated by maturin v1.6.0 +# To update, run +# +# maturin generate-ci github +# +name: CI + +on: + push: + branches: + - main + tags: + - '*' + pull_request: + workflow_dispatch: + +permissions: + contents: read + +jobs: + linux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: x86 + - runner: ubuntu-latest + target: aarch64 + - runner: ubuntu-latest + target: armv7 + - runner: ubuntu-latest + target: s390x + - runner: ubuntu-latest + target: ppc64le + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + manylinux: auto + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-linux-${{ matrix.platform.target }} + path: dist + + musllinux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: x86 + - runner: ubuntu-latest + target: aarch64 + - runner: ubuntu-latest + target: armv7 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + manylinux: musllinux_1_2 + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-musllinux-${{ matrix.platform.target }} + path: dist + + windows: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: windows-latest + target: x64 + - runner: windows-latest + target: x86 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + architecture: ${{ matrix.platform.target }} + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-windows-${{ matrix.platform.target }} + path: dist + + macos: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: macos-12 + target: x86_64 + - runner: macos-14 + target: aarch64 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v4 + with: + name: wheels-macos-${{ matrix.platform.target }} + path: dist + + sdist: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Build sdist + uses: PyO3/maturin-action@v1 + with: + command: sdist + args: --out dist + - name: Upload sdist + uses: actions/upload-artifact@v4 + with: + name: wheels-sdist + path: dist + +# release: +# name: Release +# runs-on: ubuntu-latest +# if: "startsWith(github.ref, 'refs/tags/')" +# needs: [linux, musllinux, windows, macos, sdist] +# steps: +# - uses: actions/download-artifact@v4 +# - name: Publish to PyPI +# uses: PyO3/maturin-action@v1 +# env: +# MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_API_TOKEN }} +# with: +# command: upload +# args: --non-interactive --skip-existing wheels-*/* diff --git a/.gitignore b/.gitignore index c8f0442..ce26567 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,7 @@ docs/_build/ # Pyenv .python-version + + +# tmp generation +tmp \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0f3f986..bd02213 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,9 @@ dependencies = ["pyarrow", "typer", "deltalake"] [project.optional-dependencies] dev = ["ruff", "ipython"] +[project.scripts] +falsa = "falsa.app:entry_point" + [tool.ruff] line-length = 120 diff --git a/python/falsa/__init__.py b/python/falsa/__init__.py new file mode 100644 index 0000000..73fabc6 --- /dev/null +++ b/python/falsa/__init__.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class H2ODatasetSizes(int, Enum): + SMALL = 10_000_000 + MEDIUM = 100_000_000 + BIG = 1_000_000_000 + diff --git a/python/falsa/app.py b/python/falsa/app.py new file mode 100644 index 0000000..ed56fec --- /dev/null +++ b/python/falsa/app.py @@ -0,0 +1,353 @@ +import random +from enum import Enum +from pathlib import Path + +import pyarrow as pa +import typer +from deltalake import write_deltalake +from pyarrow import csv, parquet +from rich import print +from rich.progress import track +from typing_extensions import Annotated + +from falsa import H2ODatasetSizes +from falsa.local_fs import ( + NATIVE_I64_MAX_VALUE, + GroupByGenerator, + JoinBigGenerator, + JoinMediumGenerator, + JoinSmallGenerator, +) + +help_str = """ +[bold][green]H2O db-like-benchmark data generation.[/green][/bold]\n +[italic][red]This implementation is unofficial![/red][/italic] +For the official implementation please check https://github.com/duckdblabs/db-benchmark/tree/main/_data + +Available commands are: +- [green]groupby[/green]: generate GroupBy dataset; +- [green]join[/green]: generate three Join datasets (small, medium, big); + + +Author: github.com/SemyonSinchenko +Source code: https://github.com/mrpowers-io/falsa +""" + +app = typer.Typer( + rich_markup_mode="rich", + help=help_str, +) + + +class Size(str, Enum): + SMALL = "SMALL" + MEDIUM = "MEDIUM" + BIG = "BIG" + + def _to(self) -> H2ODatasetSizes: + # Workaround. Typer does not support IntEnum + if self is Size.SMALL: + return H2ODatasetSizes.SMALL + elif self is Size.MEDIUM: + return H2ODatasetSizes.MEDIUM + else: + return H2ODatasetSizes.BIG + + +class Format(str, Enum): + CSV = "CSV" + DELTA = "DELTA" + PARQUET = "PARQUET" + + def pprint(self): + print(f"An output format is [green]{self.value}[/green]") + if self is Format.DELTA: + print("\n[red]Warning![/red]Batch writes are not supported for Delta!") + print("The whole dataset will be materialized first!") + else: + print("\nBatch mode is supported.") + print("In case of memory problems you can try to reduce a [green]batch_size[/green].") + print() + + +# An amount of rows per dataset is n // divisor +_DIVISORS = { + "groupby": 1, + "join_big": 1, + "join_big_na": 1, + "join_small": 1_000_000, + "join_medium": 1_000, +} + + +def _pretty_sci(n: int) -> str: + # See https://github.com/duckdblabs/db-benchmark/blob/main/_data/groupby-datagen.R#L5 + # pretty_sci = function(x) { + # tmp<-strsplit(as.character(x), "+", fixed=TRUE)[[1L]] + # if(length(tmp)==1L) { + # paste0(substr(tmp, 1L, 1L), "e", nchar(tmp)-1L) + # } else if(length(tmp)==2L){ + # paste0(tmp[1L], as.character(as.integer(tmp[2L]))) + # } + # } + if n == 0: + return "NA" + + # format in scientific notation and remove + + formatted_num = f"{n:.0e}".replace("+", "") + e_value = int(formatted_num.split("e")[1]) + + if e_value >= 10: + return formatted_num + + elif e_value == 0: + return formatted_num.replace("00", "0") + + elif e_value < 10: + return formatted_num.replace("0", "") + + else: + raise ValueError("Unexpected value following e") + + +def _create_filename(ds_type: str, n: int, k: int, nas: int, fmt: Format) -> str: + if fmt is Format.DELTA: + suffix = "" + else: + suffix = "." + fmt.lower() + output_names = { + "groupby": "G1_{n}_{n}_{k}_{nas}{fmt}", + "join_big": "J1_{n}_{n}_NA{fmt}", + "join_big_na": "J1_{n}_{n}_{nas}{fmt}", + "join_small": "J1_{n}_{n_divided}_{nas}{fmt}", + "join_medium": "J1_{n}_{n_divided}_{nas}{fmt}", + } + + n_divisor = _DIVISORS[ds_type] + n_divided = n // n_divisor + return output_names[ds_type].format(n=_pretty_sci(n), n_divided=_pretty_sci(n_divided), k=k, nas=nas, fmt=suffix) + + +def _clear_prev_if_exists(fp: Path, fmt: Format) -> None: + if fp.exists(): + # All is file, delta is directory + # Delta delete dir by itself. + if fmt is not Format.DELTA: + fp.unlink() + + +@app.command(help="Create H2O GroupBy Dataset") +def groupby( + path_prefix: Annotated[str, typer.Option(help="An output folder for generated data")], + size: Annotated[Size, typer.Option(help="Dataset size: 0.5Gb, 5Gb, 50Gb")] = Size.SMALL, + k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 100, + nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0, + seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42, + batch_size: Annotated[ + int, typer.Option(min=0, help="A batch-size (in rows)") + ] = 5_000_000, + data_format: Annotated[ + Format, + typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"), + ] = Format.CSV, +): + gb = GroupByGenerator(size._to(), k, nas, seed, batch_size) + data_filename = _create_filename("groupby", size._to().value, k, nas, data_format) + output_dir = Path(path_prefix) + if not output_dir.exists(): + output_dir.mkdir(parents=True) + output_filepath = output_dir.joinpath(data_filename) + _clear_prev_if_exists(output_filepath, data_format) + + print(f"{size._to().value} rows will be saved into: [green]{output_filepath.absolute().__str__()}[/green]\n") + + schema = pa.schema( + [ + ("id1", pa.utf8()), + ("id2", pa.utf8()), + ("id3", pa.utf8()), + ("id4", pa.int64()), + ("id5", pa.int64()), + ("id6", pa.int64()), + ("v1", pa.int64(), False), + ("v2", pa.int64(), False), + ("v3", pa.float64(), False), + ] + ) + + print("An output data [green]schema[/green] is the following:") + print(schema) + print() + + data_format.pprint() + print() + if data_format is Format.CSV: + writer = csv.CSVWriter(sink=output_filepath, schema=schema) + for batch in track(gb.iter_batches(), total=len(gb.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.PARQUET: + writer = parquet.ParquetWriter(where=output_filepath, schema=schema) + for batch in track(gb.iter_batches(), total=len(gb.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.DELTA: + write_deltalake(output_filepath, data=gb.iter_batches(), schema=schema) + + +@app.command(help="Create three H2O join datasets") +def join( + path_prefix: Annotated[str, typer.Option(help="An output folder for generated data")], + size: Annotated[Size, typer.Option(help="Dataset size: 0.5Gb, 5Gb, 50Gb")] = Size.SMALL, + k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 10, + nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0, + seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42, + batch_size: Annotated[ + int, typer.Option(min=0, help="A batch-size (in rows)") + ] = 5_000_000, + data_format: Annotated[ + Format, + typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"), + ] = Format.CSV, +): + random.seed(seed) + keys_seed = random.randint(0, NATIVE_I64_MAX_VALUE) + generation_seed = random.randint(0, NATIVE_I64_MAX_VALUE) + n_small = size._to() // _DIVISORS["join_small"] + n_medium = size._to() // _DIVISORS["join_medium"] + n_big = size._to() // _DIVISORS["join_big"] + join_small = JoinSmallGenerator(size._to(), n_small, k, generation_seed, keys_seed, min([batch_size, n_small])) + join_medium = JoinMediumGenerator(size._to(), n_medium, k, generation_seed, keys_seed, min([batch_size, n_medium])) + join_big = JoinBigGenerator(size._to(), n_big, k, nas, generation_seed, keys_seed, min([batch_size, n_big])) + + data_filename_small = _create_filename("join_small", size._to().value, k, nas, data_format) + data_filename_medium = _create_filename("join_medium", size._to().value, k, nas, data_format) + data_filename_big = _create_filename("join_big", size._to().value, k, nas, data_format) + + output_dir = Path(path_prefix) + if not output_dir.exists(): + output_dir.mkdir(parents=True) + output_small = output_dir.joinpath(data_filename_small) + output_medium = output_dir.joinpath(data_filename_medium) + output_big = output_dir.joinpath(data_filename_big) + + _clear_prev_if_exists(output_small, data_format) + _clear_prev_if_exists(output_medium, data_format) + _clear_prev_if_exists(output_big, data_format) + + print( + f"{size._to().value // _DIVISORS['join_small']} rows will be saved into: [green]{output_small.absolute().__str__()}[/green]\n" + ) + print( + f"{size._to().value // _DIVISORS['join_medium']} rows will be saved into: [green]{output_medium.absolute().__str__()}[/green]\n" + ) + print( + f"{size._to().value // _DIVISORS['join_big']} rows will be saved into: [green]{output_big.absolute().__str__()}[/green]\n" + ) + + schema_small = pa.schema( + [ + ("id1", pa.int64(), False), + ("id4", pa.utf8(), False), + ("v2", pa.float64(), False), + ] + ) + schema_medium = pa.schema( + [ + ("id1", pa.int64(), False), + ("id2", pa.int64(), False), + ("id4", pa.utf8(), False), + ("id5", pa.utf8(), False), + ("v2", pa.float64(), False), + ] + ) + schema_big = pa.schema( + [ + ("id1", pa.int64()), + ("id2", pa.int64()), + ("id3", pa.int64()), + ("id4", pa.utf8(), False), + ("id5", pa.utf8(), False), + ("id6", pa.utf8(), False), + ("v2", pa.float64()), + ] + ) + data_format.pprint() + print() + + print("An [bold]SMALL[/bold] data [green]schema[/green] is the following:") + print(schema_small) + print() + + if data_format is Format.CSV: + writer = csv.CSVWriter(sink=output_small, schema=schema_small) + for batch in track(join_small.iter_batches(), total=len(join_small.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.PARQUET: + writer = parquet.ParquetWriter(where=output_small, schema=schema_small) + for batch in track(join_small.iter_batches(), total=len(join_small.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.DELTA: + write_deltalake(output_small, data=join_small.iter_batches(), schema=schema_small) + + print() + print("An [bold]MEDIUM[/bold] data [green]schema[/green] is the following:") + print(schema_medium) + print() + + if data_format is Format.CSV: + writer = csv.CSVWriter(sink=output_medium, schema=schema_medium) + for batch in track(join_medium.iter_batches(), total=len(join_medium.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.PARQUET: + writer = parquet.ParquetWriter(where=output_medium, schema=schema_medium) + for batch in track(join_medium.iter_batches(), total=len(join_medium.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.DELTA: + write_deltalake(output_medium, data=join_medium.iter_batches(), schema=schema_medium) + + print() + print("An [bold]BIG[/bold] data [green]schema[/green] is the following:") + print(schema_big) + print() + + if data_format is Format.CSV: + writer = csv.CSVWriter(sink=output_big, schema=schema_big) + for batch in track(join_big.iter_batches(), total=len(join_big.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.PARQUET: + writer = parquet.ParquetWriter(where=output_big, schema=schema_big) + for batch in track(join_big.iter_batches(), total=len(join_big.batches)): + writer.write_batch(batch) + + writer.close() + + if data_format is Format.DELTA: + write_deltalake(output_big, data=join_big.iter_batches(), schema=schema_big) + + +def entry_point() -> None: + app() + + +if __name__ == "__main__": + entry_point() diff --git a/python/falsa/local_fs.py b/python/falsa/local_fs.py new file mode 100644 index 0000000..d297b5e --- /dev/null +++ b/python/falsa/local_fs.py @@ -0,0 +1,166 @@ +import random +from typing import Iterator + +import pyarrow as pa + +from falsa import H2ODatasetSizes + +from .native import ( + generate_groupby, + generate_join_dataset_big, + generate_join_dataset_medium, + generate_join_dataset_small, +) + +NATIVE_I64_MAX_VALUE = 9_223_372_036_854_775_806 + + +def _validate_int64(num: int, prefix: str) -> None: + # We are passing values from Python as i64 and converted to u64/usize inside; + # Better to catch it in Python instead of facing panic in native-part. + if num < 0: + raise ValueError(f"Negative values are not supported but got {prefix}={num}") + if num > NATIVE_I64_MAX_VALUE: + raise ValueError(f"Values are passed to native as int64; MAX={NATIVE_I64_MAX_VALUE} but got {prefix}={num}") + + +class GroupByGenerator: + """A simple wrapper on top of native generator. + + The class takes care of random seeds generation, input validation + and calculation of the size of all batches. + """ + + def __init__( + self, size: H2ODatasetSizes | int, k: int, nas: int = 0, seed: int = 42, batch_size: int = 5_000_000 + ) -> None: + _validate_int64(size, "size") + if (nas < 0) or (nas > 100): + raise ValueError(f"nas should be in [0, 100], but got {nas}") + if (k < 0) or (k > size): + raise ValueError(f"k should be positive and less than {size} but got {k}") + if (batch_size <= 0) or (batch_size > size): + raise ValueError(f"batch size should be positive and less than {size} but got {batch_size}") + self.n: int = size + self.k = k + self.nas = nas + + num_batches = self.n // batch_size + batches = [batch_size for _ in range(num_batches)] + # A corner case when we need to add one more batch + if self.n % batch_size != 0: + batches.append(self.n % batch_size) + + # Generate a random seed per batch + random.seed(seed) + self.batches = [{"size": bs, "seed": random.randint(0, NATIVE_I64_MAX_VALUE)} for bs in batches] + + def iter_batches(self) -> Iterator[pa.RecordBatch]: + for batch in self.batches: + yield generate_groupby(self.n, self.k, self.nas, batch["seed"], batch["size"]) + + +class JoinSmallGenerator: + """A simple wrapper on top of native generator. + + The class takes care of random seeds generation, input validation + and calculation of the size of all batches. + """ + + def __init__( + self, size: H2ODatasetSizes | int, n_rows: int, k: int, seed: int = 42, keys_seed: int = 142, batch_size: int = 5_000_000 + ) -> None: + _validate_int64(size, "size") + if (k < 0) or (k > size): + raise ValueError(f"k should be positive and less than {size} but got {k}") + if (batch_size <= 0) or (batch_size > size): + raise ValueError(f"batch size should be positive and less than {size} but got {batch_size}") + self.n: int = size + self.n_rows = n_rows + self.k = k + self.keys_seed = keys_seed + + num_batches = self.n_rows // batch_size + batches = [batch_size for _ in range(num_batches)] + # A corner case when we need to add one more batch + if self.n_rows % batch_size != 0: + batches.append(self.n_rows % batch_size) + + # Generate a random seed per batch + random.seed(seed) + self.batches = [{"size": bs, "seed": random.randint(0, NATIVE_I64_MAX_VALUE)} for bs in batches] + + def iter_batches(self) -> Iterator[pa.RecordBatch]: + for batch in self.batches: + yield generate_join_dataset_small(self.n, batch["seed"], self.keys_seed, batch["size"]) + + +class JoinMediumGenerator: + """A simple wrapper on top of native generator. + + The class takes care of random seeds generation, input validation + and calculation of the size of all batches. + """ + + def __init__( + self, size: H2ODatasetSizes | int, n_rows: int, k: int, seed: int = 42, keys_seed: int = 142, batch_size: int = 5_000_000 + ) -> None: + _validate_int64(size, "size") + if (k < 0) or (k > size): + raise ValueError(f"k should be positive and less than {size} but got {k}") + if (batch_size <= 0) or (batch_size > size): + raise ValueError(f"batch size should be positive and less than {size} but got {batch_size}") + self.n: int = size + self.n_rows = n_rows + self.k = k + self.keys_seed = keys_seed + + num_batches = self.n_rows // batch_size + batches = [batch_size for _ in range(num_batches)] + # a corner case when we need to add one more batch + if self.n_rows % batch_size != 0: + batches.append(self.n_rows % batch_size) + + # Generate a random seed per batch + random.seed(seed) + self.batches = [{"size": bs, "seed": random.randint(0, NATIVE_I64_MAX_VALUE)} for bs in batches] + + def iter_batches(self) -> Iterator[pa.RecordBatch]: + for batch in self.batches: + yield generate_join_dataset_medium(self.n, batch["seed"], self.keys_seed, batch["size"]) + + +class JoinBigGenerator: + """A simple wrapper on top of native generator. + + The class takes care of random seeds generation, input validation + and calculation of the size of all batches. + """ + + def __init__( + self, size: H2ODatasetSizes | int, n_rows: int, k: int, nas: int, seed: int = 42, keys_seed: int = 142, batch_size: int = 5_000_000 + ) -> None: + _validate_int64(size, "size") + if (k < 0) or (k > size): + raise ValueError(f"k should be positive and less than {size} but got {k}") + if (batch_size <= 0) or (batch_size > size): + raise ValueError(f"batch size should be positive and less than {size} but got {batch_size}") + self.n: int = size + self.n_rows = n_rows + self.nas = nas + self.k = k + self.keys_seed = keys_seed + + num_batches = self.n // batch_size + batches = [batch_size for _ in range(num_batches)] + # A corner case when we need to add one more batch + if self.n % batch_size != 0: + batches.append(self.n % batch_size) + + # Generate a random seed per batch + random.seed(seed) + self.batches = [{"size": bs, "seed": random.randint(0, NATIVE_I64_MAX_VALUE)} for bs in batches] + + def iter_batches(self) -> Iterator[pa.RecordBatch]: + for batch in self.batches: + yield generate_join_dataset_big(self.n, self.nas, batch["seed"], self.keys_seed, batch["size"]) diff --git a/src/lib.rs b/src/lib.rs index 8c77a2b..4ce8f83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -282,11 +282,11 @@ fn generate_join_dataset_medium( let mut k2x = k2 .get(0..(n as usize * 9 / 10 / 1_000_000)) - .expect("internal indexing error with k1") + .expect("internal indexing error with k2") .to_vec(); let mut k2r = k2 .get((n as usize / 1_000_000)..(n as usize * 11 / 10 / 1_000_000)) - .expect("internal indexing error with k1") + .expect("internal indexing error with k2") .to_vec(); k2x.append(&mut k2r); @@ -455,7 +455,7 @@ fn generate_join_dataset_big( Field::new("id3", DataType::Int64, true), Field::new("id4", DataType::Utf8, false), Field::new("id5", DataType::Utf8, false), - Field::new("id6", DataType::Utf8, true), + Field::new("id6", DataType::Utf8, false), Field::new("v2", DataType::Float64, true), ]);