Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .root to .sqlite converter #514

Closed
wants to merge 7 commits into from

Conversation

kaareendrup
Copy link
Collaborator

.root to .sqlite conversion

This is an initial attempt at implementing a converter class (and subsequent extractor-classes) for .root to .sqlite conversion. The implementation follows the structure of the current .sqlite converter, with some exceptions and caveats as described in the following.

The following are notable exceptions to the I3 to .sqlite converter:

No multiprocessing

Since the computational bottleneck in case of .root files is submitting dataframes to the .sqlite-databases, there is no multiprocessing. Batchwise conversion is implemented to avoid having the entire database in memory at once.

No pattern batching

This should be relatively easy to implement, but I didn't understand it, so I chose to not bother

No inheritance from current classes

Perhaps the most important one. Since the current base classes have some I3 specific funtionality (like attaching gcd-files to lists of I3 files), the new classes don't inherit from them. This could perhaps be basis for a restructuring of the converter and extractor classes, if the goal is to provide more general functionality for different experiments.

There are probably many other things that I have missed or misunderstood, but I thought it would be good to join the forces of those who have worked hard on the implementation of the current classes with those who have an eye for root functionality.

Use

The classes use the uproot package and awkward pandas (which would maybe need to be added to the requirements). A simple conversion example (for ESSnuSB) could look like:

from graphnet.data.sqlite.root_dataconverter import rootSQLiteDataConverter
from graphnet.data.extractors.rootfeatureextractor import (
    rootFeatureExtractorESSnuSB,
    rootTruthExtractorESSnuSB,
    rootfiTQunExtractorESSnuSB
)

CONVERTER_CLASS = {
    "sqlite": rootSQLiteDataConverter,
}

def main_essnusb(backend: str) -> None:
    """Convert root files to intermediate `backend` format."""
    # Check(s)
    assert backend in CONVERTER_CLASS

    inputs = ["/path/to/files"]
    outdir = "/path/to/db"
    name = "db_name"

    converter: rootSQLiteDataConverter = CONVERTER_CLASS[backend](
        [   
            rootFeatureExtractorESSnuSB('pulsemap_name', 'pulsemap_key'),
            rootTruthExtractorESSnuSB('truth_name', 'truth_key'),
            rootfiTQunExtractorESSnuSB('reco_name', 'reco_key'),
        ],
        outdir,
        name,
        nb_files_to_batch=20,
    )
    converter(inputs)

main_essnusb("sqlite")

I'm unsure about how well this will generalize to different tree structures, but the combination of different main_keys and branch_keys should give a lot of flexibility.

Copy link
Collaborator

@RasmusOrsoe RasmusOrsoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kaareendrup!

Let me start by apologizing for the wait; it has been quite hectic since the workshop! Second; thanks for sharing this code.

The DataConverter was written to extract i3 -> [parquet, SQLite] so there is indeed some i3 specific code in there. However, much of it is completely independent of IceCube software, and I think we can come up with a nice way to structure this. I will give this a think and return with some comments on how next week. In the mean time:

You mention that the computational bottleneck is in the submission of dataframes to SQLite format and that multiprocessing therefore isn't relevant for root files. If you have everything in one root file, I can see how this can be the case. But do you not have many root files that needs to be converted? The multiprocessing is currently implemented such that many files are converted in parallel instead of sequentially, which should give you a linear speedup in n_workers.

Could you please install pre-commit hooks and update the code such that the checks pass? More info here

@kaareendrup
Copy link
Collaborator Author

Hi @RasmusOrsoe !

No problem, we are all busy, and despite my good intentions, it will likely also take a while before I implement your suggestions. I shall definitely take a look at the pre-commit hooks. I was only unsure about whether or not it was worthwhile at this point.

Regarding multiprocessing, my experience has been that the overhead when combining the temporary .sqlite-files is greater than/comparable to that of processing the .root-files, so no speed was gained when processing the .root-files in parallel. But I of course don't know if this will always be the case, so I'm fine with implementing multiprocessing just in case.

@RasmusOrsoe
Copy link
Collaborator

@kaareendrup I have given this some more thought, and come up with an idea draft. Below is pseudo-code that lays out the refactor of DataConverter and associated submodules. The general idea is to use inheritance of abstract classes to minimize code repetition and the amount of lines that needs to be implemented to accommodate your converter.

Starting with DataConverter: As it exists currently, it is a class that assumes i3 file format as input and has no assumption on output file format; it forces the user to implement a "back-end" specific version that writes output to sqlite or parquet. I.e. it follows the semi-open scheme i3 -> Any. In the following pseudo-code, I propose to refactor DataConverter such that it provides structure and functionality that is independent of both input and output file formats. i.e. Any -> Any.

class DataConverter(ABC, Logger):
    """Base class for converting files to intermediate file format."""

    @property
    @abstractmethod
    def file_suffix(self) -> str:
        """Suffix to use on output files."""

    def __init__(
        self,
        extractors: List[Extractor],
        outdir: str,
        *,
        nb_files_to_batch: Optional[int] = None,
        sequential_batch_pattern: Optional[str] = None,
        input_file_batch_pattern: Optional[str] = None,
        workers: int = 1,
        index_column: str = "event_no",
    ):
        """Construct DataConverter.

        When using `input_file_batch_pattern`, regular expressions are used to
        group files according to their names. All files that match a certain
        pattern up to wildcards are grouped into the same output file. This
        output file has the same name as the input files that are group into it,
        with wildcards replaced with "x". Periods (.) and wildcards (*) have a
        special meaning: Periods are interpreted as literal periods, and not as
        matching any character (as in standard regex); and wildcards are
        interpreted as ".*" in standard regex.

        For instance, the pattern "[A-Z]{1}_[0-9]{5}*.i3.zst" will find all I3
        files whose names contain:
         - one capital letter, followed by
         - an underscore, followed by
         - five numbers, followed by
         - any string of characters ending in ".i3.zst"

        This means that, e.g., the files:
         - upgrade_genie_step4_141020_A_000000.i3.zst
         - upgrade_genie_step4_141020_A_000001.i3.zst
         - ...
         - upgrade_genie_step4_141020_A_000008.i3.zst
         - upgrade_genie_step4_141020_A_000009.i3.zst
        would be grouped into the output file named
        "upgrade_genie_step4_141020_A_00000x.<suffix>" but the file
         - upgrade_genie_step4_141020_A_000010.i3.zst
        would end up in a separate group, named
        "upgrade_genie_step4_141020_A_00001x.<suffix>".
        """
        # Check(s)
        if not isinstance(extractors, (list, tuple)):
            extractors = [extractors]

        assert (
            len(extractors) > 0
        ), "Please specify at least one argument of type I3Extractor"

        for extractor in extractors:
            assert isinstance(
                extractor, Extractor
            ), f"{type(extractor)} is not a subclass of I3Extractor"

        # Infer saving strategy
        save_strategy = self._infer_save_strategy(
            nb_files_to_batch,
            sequential_batch_pattern,
            input_file_batch_pattern,
        )

        # Member variables
        self._outdir = outdir
        self._save_strategy = save_strategy
        self._nb_files_to_batch = nb_files_to_batch
        self._sequential_batch_pattern = sequential_batch_pattern
        self._input_file_batch_pattern = input_file_batch_pattern
        self._workers = workers

        # Create I3Extractors
        self._extractors = ExtractorCollection(*extractors)

        # Create shorthand of names of all pulsemaps queried
        self._table_names = [extractor.name for extractor in self._extractors]
        self._pulsemaps = [
            extractor.name
            for extractor in self._extractors
            if isinstance(extractor, I3FeatureExtractor)
        ]

        # Placeholders for keeping track of sequential event indices and output files
        self._index_column = index_column
        self._index = 0
        self._output_files: List[str] = []

        # Base class constructor
        super().__init__(name=__name__, class_name=self.__class__.__name__)

    @final
    def __call__(self, directories: Union[str, List[str]]) -> None:
        """Convert I3-files in `directories.

        Args:
            directories: One or more directories, the I3 files within which
                should be converted to an intermediate file format.
        """
        # Find all files in the specified directories and prepare them as FileSets.
        filesets = self._prepare_files(directories)
        if len(filesets) == 0:
            self.error(f"No files found in {directories}.")
            return

        # Save a record of the found I3 files in the output directory.
        self._save_filenames(filesets)

        # Shuffle I3 files to get a more uniform load on worker nodes.
        i3_files, gcd_files = pairwise_shuffle(i3_files, gcd_files)

        # Process the files
        self.execute(filesets)

    @abstractmethod
    def _prepare_files(directories: Union[str, List[str]]) -> FileSet:
        """Search directories and create FileSets that are to be converted.

        Returns:
            FileSets
        """


    @final
    def execute(self, filesets: List[FileSet]) -> None:
        """General method for processing a set of files.

        The files are converted individually according to the inheriting class/
        intermediate file format.

        Args:
            filesets: A FileSet.
        """
        # Make sure output directory exists.
        self.info(f"Saving results to {self._outdir}")
        os.makedirs(self._outdir, exist_ok=True)

        # Iterate over batches of files.
        try:
            if self._save_strategy == "sequential_batched":
                # Define batches
                assert self._nb_files_to_batch is not None
                assert self._sequential_batch_pattern is not None
                batches = np.array_split(
                    np.asarray(filesets),
                    int(np.ceil(len(filesets) / self._nb_files_to_batch)),
                )
                batches = [
                    (
                        group.tolist(),
                        self._sequential_batch_pattern.format(ix_batch),
                    )
                    for ix_batch, group in enumerate(batches)
                ]
                self.info(
                    f"Will batch {len(filesets)} input files into {len(batches)} groups."
                )

                # Iterate over batches
                pool = self._iterate_over_batches_of_files(batches)

            elif self._save_strategy == "pattern_batched":
                # Define batches
                groups: Dict[str, List[FileSet]] = OrderedDict()
                for fileset in sorted(filesets, key=lambda f: f.input_file):
                    group = re.sub(
                        self._sub_from,
                        self._sub_to,
                        os.path.basename(fileset.input_file),
                    )
                    if group not in groups:
                        groups[group] = list()
                    groups[group].append(fileset)

                self.info(
                    f"Will batch {len(filesets)} input files into {len(groups)} groups"
                )
                if len(groups) <= 20:
                    for group, group_filesets in groups.items():
                        self.info(
                            f"> {group}: {len(group_filesets):3d} file(s)"
                        )

                batches = [
                    (list(group_filesets), group)
                    for group, group_filesets in groups.items()
                ]

                # Iterate over batches
                pool = self._iterate_over_batches_of_files(batches)

            elif self._save_strategy == "1:1":
                pool = self._iterate_over_individual_files(filesets)

            else:
                assert False, "Shouldn't reach here."

            self._update_shared_variables(pool)

        except KeyboardInterrupt:
            self.warning("[ctrl+c] Exciting gracefully.")

    @abstractmethod
    def save_data(self, data: List[OrderedDict], output_file: str) -> None:
        """Implementation-specific method for saving data to file.

        Args:
            data: List of extracted features.
            output_file: Name of output file.
        """

    @abstractmethod
    def merge_files(
        self, output_file: str, input_files: Optional[List[str]] = None
    ) -> None:
        """Implementation-specific method for merging output files.

        Args:
            output_file: Name of the output file containing the merged results.
            input_files: Intermediate files to be merged, according to the
                specific implementation. Default to None, meaning that all
                files output by the current instance are merged.

        Raises:
            NotImplementedError: If the method has not been implemented for the
                backend in question.
        """

    # Internal methods
    def _iterate_over_individual_files(
        self, args: List[FileSet]
    ) -> Optional[multiprocessing.pool.Pool]:
        # Get appropriate mapping function
        map_fn, pool = self.get_map_function(len(args))

        # Iterate over files
        for _ in map_fn(
            self._process_file, tqdm(args, unit="file(s)", colour="green")
        ):
            self.debug(
                "Saving with 1:1 strategy on the individual worker processes"
            )

        return pool

    def _iterate_over_batches_of_files(
        self, args: List[Tuple[List[FileSet], str]]
    ) -> Optional[multiprocessing.pool.Pool]:
        """Iterate over a batch of files and save results on worker process."""
        # Get appropriate mapping function
        map_fn, pool = self.get_map_function(len(args), unit="batch(es)")

        # Iterate over batches of files
        for _ in map_fn(
            self._process_batch, tqdm(args, unit="batch(es)", colour="green")
        ):
            self.debug("Saving with batched strategy")

        return pool

    def _update_shared_variables(
        self, pool: Optional[multiprocessing.pool.Pool]
    ) -> None:
        """Update `self._index` and `self._output_files`.

        If `pool` is set, it means that multiprocessing was used. In this case,
        the worker processes will not have been able to write directly to
        `self._index` and `self._output_files`, and we need to get them synced
        up.
        """
        if pool:
            # Extract information from shared variables to member variables.
            index, output_files = pool._initargs  # type: ignore
            self._index += index.value
            self._output_files.extend(list(sorted(output_files[:])))

    @cache_output_files
    def _process_file(
        self,
        fileset: FileSet,
    ) -> str:

        # Process individual files
        data = self._extract_data(fileset)

        # Save data
        output_file = self._get_output_file(fileset.i3_file)
        self.save_data(data, output_file)

        return output_file

    @cache_output_files
    def _process_batch(self, args: Tuple[List[FileSet], str]) -> str:
        # Unpack arguments
        filesets, output_file_name = args

        # Process individual files
        data = list(
            itertools.chain.from_iterable(map(self._extract_data, filesets))
        )

        # Save batched data
        output_file = self._get_output_file(output_file_name)
        self.save_data(data, output_file)

        return output_file
    
    @abstractmethod
    def _extract_data(self, fileset: FileSet) -> List[OrderedDict]:
        """Extract data from single file.

        If the saving strategy is 1:1 (i.e., each file is converted to a
        corresponding intermediate file) the data is saved to such a file, and
        no data is return from the method.

        The above distincting is to allow worker processes to save files rather
        than sending it back to the main process.

        Args:
            fileset: A FileSet.

        Returns:
            Extracted data.
        """
       

    def get_map_function(
        self, nb_files: int, unit: str = "I3 file(s)"
    ) -> Tuple[Any, Optional[multiprocessing.pool.Pool]]:
        """Identify map function to use (pure python or multiprocess)."""
        # Choose relevant map-function given the requested number of workers.
        workers = min(self._workers, nb_files)
        if workers > 1:
            self.info(
                f"Starting pool of {workers} workers to process {nb_files} {unit}"
            )

            manager = Manager()
            index = Value("i", 0)
            output_files = manager.list()

            pool = Pool(
                processes=workers,
                initializer=init_global_index,
                initargs=(index, output_files),
            )
            map_fn = pool.imap

        else:
            self.info(
                f"Processing {nb_files} {unit} in main thread (not multiprocessing)"
            )
            map_fn = map  # type: ignore
            pool = None

        return map_fn, pool

    def _infer_save_strategy(
        self,
        nb_files_to_batch: Optional[int] = None,
        sequential_batch_pattern: Optional[str] = None,
        input_file_batch_pattern: Optional[str] = None,
    ) -> str:
        if input_file_batch_pattern is not None:
            save_strategy = "pattern_batched"

            assert (
                "*" in input_file_batch_pattern
            ), "Argument `input_file_batch_pattern` should contain at least one wildcard (*)"

            fields = [
                "(" + field + ")"
                for field in input_file_batch_pattern.replace(
                    ".", r"\."
                ).split("*")
            ]
            nb_fields = len(fields)
            self._sub_from = ".*".join(fields)
            self._sub_to = "x".join([f"\\{ix + 1}" for ix in range(nb_fields)])

            if sequential_batch_pattern is not None:
                self.warning("Argument `sequential_batch_pattern` ignored.")
            if nb_files_to_batch is not None:
                self.warning("Argument `nb_files_to_batch` ignored.")

        elif (nb_files_to_batch is not None) or (
            sequential_batch_pattern is not None
        ):
            save_strategy = "sequential_batched"

            assert (nb_files_to_batch is not None) and (
                sequential_batch_pattern is not None
            ), "Please specify both `nb_files_to_batch` and `sequential_batch_pattern` for sequential batching."

        else:
            save_strategy = "1:1"

        return save_strategy

    def _save_filenames(self, input_files: List[str]) -> None:
        """Save file names in CSV format."""
        self.debug("Saving input file names to config CSV.")
        config_dir = os.path.join(self._outdir, "config")
        os.makedirs(config_dir, exist_ok=True)
        df_files = pd.DataFrame(data=input_files, columns=["filename"])
        df_files.to_csv(os.path.join(config_dir, "input_files.csv"))

    @abstractmethod
    def _get_output_file(self, input_file: str) -> str:
        """Return output file path associated with input file path.

        Args:
            input_file: input file path

        Returns:
            output file path
        """

This puts core functionality that is independent of both input and output file formats, such as batch patterns, boiler plate processing and parallelism into one flexible class. Implementing an i3-specific or root-specific version just requires overwriting a couple of abstract methods. Here's the i3-specific version:

class I3DataConverter(DataConverter):

    def __init__(self, gcd_rescue: str, *kwargs):
        super(DataConverter, self).__init__(*kwargs)
        # Member Variables
        self._gcd_rescue = gcd_rescue
        #Checks
        for extractor in self._extractors:
            assert isinstance(
                extractor, I3Extractor
            ), f"{type(extractor)} is not a subclass of I3Extractor" 

    def _prepare_files(self, directories: Union[str, List[str]]) -> FileSet:
        """Search directories and create FileSets that are to be converted.

        Returns:
            FileSets
        """
        # Find all I3 and GCD files in the specified directories.
        i3_files, gcd_files = find_i3_files(directories, self._gcd_rescue)
        if len(i3_files) == 0:
            self.error(f"No files found in {directories}.")
            return

        # Shuffle I3 files to get a more uniform load on worker nodes.
        i3_files, gcd_files = pairwise_shuffle(i3_files, gcd_files)

        # Process the files
        filesets = [
            I3FileSet(i3_file, gcd_file)
            for i3_file, gcd_file in zip(i3_files, gcd_files)
        ]
        return filesets
    
    def _skip_frame(self, frame: "icetray.I3Frame") -> bool:
        """Check if frame should be skipped.

        Args:
            frame: I3Frame to check.

        Returns:
            True if frame is a null split frame, else False.
        """
        if frame["I3EventHeader"].sub_event_stream == "NullSplit":
            return True
        return False

    
    def _extract_data(self, fileset: FileSet) -> List[OrderedDict]:
        """Extract data from single I3 file.

        If the saving strategy is 1:1 (i.e., each I3 file is converted to a
        corresponding intermediate file) the data is saved to such a file, and
        no data is return from the method.

        The above distincting is to allow worker processes to save files rather
        than sending it back to the main process.

        Args:
            fileset: Path to I3 file and corresponding GCD file.

        Returns:
            Extracted data.
        """
        # Infer whether method is being run using multiprocessing
        try:
            global_index  # type: ignore[name-defined]
            multi_processing = True
        except NameError:
            multi_processing = False

        self._extractors.set_files(fileset.i3_file, fileset.gcd_file)
        i3_file_io = dataio.I3File(fileset.i3_file, "r")
        data = list()
        while i3_file_io.more():
            try:
                frame = i3_file_io.pop_physics()
            except Exception as e:
                if "I3" in str(e):
                    continue
            if self._skip_frame(frame):
                continue

            # Try to extract data from I3Frame
            results = self._extractors(frame)

            data_dict = OrderedDict(zip(self._table_names, results))

            # If an I3GenericExtractor is used, we want each automatically
            # parsed key to be stored as a separate table.
            for extractor in self._extractors:
                if isinstance(extractor, I3GenericExtractor):
                    data_dict.update(data_dict.pop(extractor._name))

            # Get new, unique index and increment value
            if multi_processing:
                with global_index.get_lock():  # type: ignore[name-defined]
                    index = global_index.value  # type: ignore[name-defined]
                    global_index.value += 1  # type: ignore[name-defined]
            else:
                index = self._index
                self._index += 1

            # Attach index to all tables
            for table in data_dict.keys():
                data_dict[table][self._index_column] = index

            data.append(data_dict)

        return data 

Similarly, we can move input file independent functionality from I3Extractor into a generic Extractor class, which will allow for a similar refactor:


class Extractor(ABC, Logger):
    """Base class for extracting information.

    All classes inheriting from `Extractor` should implement the `__call__` and `set_files`
    methods.
    """

    def __init__(self, name: str):
        """Construct Extractor.

        Args:
            name: Name of the `Extractor` instance. Used to keep track of the
                provenance of different data, and to name tables to which this
                data is saved.
        """
        # Member variable(s)
        self._input_file: str = ""
        self._name: str = name
        # Base class constructor
        super().__init__(name=__name__, class_name=self.__class__.__name__)

    @abstractmethod
    def set_files(self, input_file: Union[str, Tuple[str]]) -> None:
        """Store references to the files processed by the Extractor."""

    @abstractmethod
    def __call__(self, frame: "icetray.I3Frame") -> dict:
        """Extract information from frame."""
        pass

    @property
    def name(self) -> str:
        """Get the name of the `I3Extractor` instance."""
        return self._name

class I3Extractor(Extractor):
    """Base class for extracting information from physics I3-frames.

    All classes inheriting from `I3Extractor` should implement the `__call__`
    method, and can be applied directly on `icetray.I3Frame` objects to return
    extracted, pure-python data.
    """

    def set_files(self, i3_file: str, gcd_file: str) -> None:
        """Store references to the I3- and GCD-files being processed."""
        # @TODO: Is it necessary to set the `i3_file`? It is only used in one
        #        place in `I3TruthExtractor`, and there only in a way that might
        #        be solved another way.
        self._i3_file = i3_file
        self._gcd_file = gcd_file
        self._load_gcd_data()

    def _load_gcd_data(self) -> None:
        """Load the geospatial information contained in the GCD-file."""
        # If no GCD file is provided, search the I3 file for frames containing
        # geometry (G) and calibration (C) information.
        gcd_file = dataio.I3File(self._gcd_file or self._i3_file)

        try:
            g_frame = gcd_file.pop_frame(icetray.I3Frame.Geometry)
        except RuntimeError:
            self.error(
                "No GCD file was provided and no G-frame was found. Exiting."
            )
            raise
        else:
            self._gcd_dict = g_frame["I3Geometry"].omgeo

        try:
            c_frame = gcd_file.pop_frame(icetray.I3Frame.Calibration)
        except RuntimeError:
            self.warning("No GCD file was provided and no C-frame was found.")
        else:
            self._calibration = c_frame["I3Calibration"]

    @abstractmethod
    def __call__(self, frame: "icetray.I3Frame") -> dict:
        """Extract information from frame."""
        pass

We could also go further and make our current back-end specific implementations of DataConverter (SQLite, Parquet) intrinsic to this refactored version of DataConverter, such that it becomes a class that follows Any -> [SQLite, Parquet], but let's cross that bridge later. @kaareendrup could you take a look at the proposed structure above and let me know if this suits your usecase?

@RasmusOrsoe
Copy link
Collaborator

@kaareendrup we've recently updated the data conversion code, which would make this very easy. Is this still relevant of ESS?

@RasmusOrsoe
Copy link
Collaborator

@kaareendrup closing this due to inactivity. Feel free to open this again if you want to add this contribution in the future

@RasmusOrsoe RasmusOrsoe closed this Aug 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants