From 52054f2d7302a56d05ccf7607fa7dc7741e99441 Mon Sep 17 00:00:00 2001 From: Lukas Tenbrink Date: Thu, 4 Jul 2024 16:34:20 +0200 Subject: [PATCH] Streamline file reading for single segment records, record headers and annotation files. These methods all now stack between the public functions, handling mainly i/o, and the parsing functions. --- wfdb/io/_signal.py | 194 ++++++++---------------------------------- wfdb/io/annotation.py | 86 ++++++++----------- wfdb/io/download.py | 59 ------------- wfdb/io/record.py | 87 +++++++++---------- 4 files changed, 110 insertions(+), 316 deletions(-) diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index 4ee09225..64c0e458 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -4,7 +4,8 @@ import numpy as np -from wfdb.io import download, _coreio, util +from wfdb.io import download +from wfdb.io import util MAX_I32 = 2147483647 @@ -1050,9 +1051,7 @@ def smooth_frames(self, sigtype="physical"): def _rd_segment( - file_name, - dir_name, - pn_dir, + buf, fmt, n_sig, sig_len, @@ -1066,7 +1065,6 @@ def _rd_segment( ignore_skew, no_file=False, sig_data=None, - sig_stream=None, return_res=64, ): """ @@ -1075,14 +1073,6 @@ def _rd_segment( Parameters ---------- - file_name : list - The names of the dat files to be read. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. fmt : list The formats of the dat files. n_sig : int @@ -1198,9 +1188,7 @@ def _rd_segment( for fn in w_file_name: # Get the list of all signals contained in the dat file datsignals = _rd_dat_signals( - file_name=fn, - dir_name=dir_name, - pn_dir=pn_dir, + buf, fmt=w_fmt[fn], n_sig=len(datchannel[fn]), sig_len=sig_len, @@ -1212,7 +1200,6 @@ def _rd_segment( sampto=sampto, no_file=no_file, sig_data=sig_data, - sig_stream=sig_stream, ) # Copy over the wanted signals @@ -1223,9 +1210,7 @@ def _rd_segment( def _rd_dat_signals( - file_name, - dir_name, - pn_dir, + buf, fmt, n_sig, sig_len, @@ -1237,21 +1222,12 @@ def _rd_dat_signals( sampto, no_file=False, sig_data=None, - sig_stream=None, ): """ Read all signals from a WFDB dat file. Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. fmt : str The format of the dat file. n_sig : int @@ -1327,32 +1303,17 @@ def _rd_dat_signals( if no_file: data_to_read = sig_data elif fmt in COMPRESSED_FMTS: - if sig_stream is not None: - data_to_read = _rd_compressed_stream( - fp=sig_stream, - fmt=fmt, - sample_offset=byte_offset, - n_sig=n_sig, - samps_per_frame=samps_per_frame, - start_frame=sampfrom, - end_frame=sampto, - ) - else: - data_to_read = _rd_compressed_file( - file_name=file_name, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=fmt, - sample_offset=byte_offset, - n_sig=n_sig, - samps_per_frame=samps_per_frame, - start_frame=sampfrom, - end_frame=sampto, - ) - else: - data_to_read = _rd_dat_file( - file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples, sig_stream + data_to_read = _rd_compressed_stream( + buf, + fmt=fmt, + sample_offset=byte_offset, + n_sig=n_sig, + samps_per_frame=samps_per_frame, + start_frame=sampfrom, + end_frame=sampto, ) + else: + data_to_read = _rd_dat_stream(buf, fmt, start_byte, n_read_samples) if extra_flat_samples: if fmt in UNALIGNED_FMTS: @@ -1591,7 +1552,7 @@ def _required_byte_num(mode, fmt, n_samp): return int(n_bytes) -def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_stream): +def _rd_dat_stream(buf, fmt, start_byte, n_samp): """ Read data from a dat file, either local or remote, into a 1d numpy array. @@ -1602,14 +1563,6 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_strea Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. fmt : str The format of the dat file. start_byte : int @@ -1649,27 +1602,11 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_strea element_count = n_samp byte_count = n_samp * BYTES_PER_SAMPLE[fmt] - # Memory Stream - if sig_stream is not None: - sig_stream.seek(start_byte) - sig_data = np.frombuffer( - sig_stream.read(), dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count - ) - # Local dat file - elif pn_dir is None: - with open(os.path.join(dir_name, file_name), "rb") as fp: - fp.seek(start_byte) - sig_data = np.fromfile( - fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count - ) - # Stream dat file from Physionet - else: - dtype_in = np.dtype(DATA_LOAD_TYPES[fmt]) - sig_data = download._stream_dat( - file_name, pn_dir, byte_count, start_byte, dtype_in - ) + buf.seek(start_byte) + return np.frombuffer( + buf.read(byte_count), dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count + ) - return sig_data def _blocks_to_samples(sig_data, n_samp, fmt): """ @@ -1790,7 +1727,7 @@ def _blocks_to_samples(sig_data, n_samp, fmt): def _rd_compressed_stream( - fp, + buf, fmt, sample_offset, n_sig, @@ -1798,12 +1735,20 @@ def _rd_compressed_stream( start_frame, end_frame, ): - signature = fp.read(4) + import soundfile + + if any(spf != samps_per_frame[0] for spf in samps_per_frame): + raise ValueError( + "All channels in a FLAC signal file must have the same " + "sampling rate and samples per frame" + ) + + signature = buf.read(4) if signature != b"fLaC": - raise ValueError(f"{fp.name} is not a FLAC file") - fp.seek(0) + raise ValueError(f"{buf.name} is not a FLAC file") + buf.seek(0) - with soundfile.SoundFile(fp) as sf: + with soundfile.SoundFile(buf) as sf: # Determine the actual resolution of the FLAC stream and the # data type will use when reading it. Note that soundfile # doesn't support int8. @@ -1817,18 +1762,18 @@ def _rd_compressed_stream( format_bits = 24 read_dtype = "int32" else: - raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})") + raise ValueError(f"unknown subtype in {buf.name} ({sf.subtype})") max_bits = int(fmt) - 500 if format_bits > max_bits: raise ValueError( - f"wrong resolution in {fp.name} " + f"wrong resolution in {buf.name} " f"({format_bits}, expected <= {max_bits})" ) if sf.channels != n_sig: raise ValueError( - f"wrong number of channels in {fp.name} " + f"wrong number of channels in {buf.name} " f"({sf.channels}, expected {n_sig})" ) @@ -1906,73 +1851,6 @@ def _rd_compressed_stream( return sig_data.reshape(-1) -def _rd_compressed_file( - file_name, - dir_name, - pn_dir, - fmt, - sample_offset, - n_sig, - samps_per_frame, - start_frame, - end_frame, -): - """ - Read data from a compressed file into a 1D numpy array. - - Parameters - ---------- - file_name : str - The name of the signal file. - dir_name : str - The full directory where the signal file is located, if local. - This argument is ignored if `pn_dir` is not None. - pn_dir : str or None - The PhysioNet database directory where the signal file is located. - fmt : str - The format code of the signal file. - sample_offset : int - The sample number in the signal file corresponding to sample 0 of - the WFDB record. - n_sig : int - The number of signals in the file. - samps_per_frame : list - The number of samples per frame for each signal in the file. - start_frame : int - The starting frame number to read. - end_frame : int - The ending frame number to read. - - Returns - ------- - signal : ndarray - The data read from the signal file. This is a one-dimensional - array in the same order the samples would be stored in a binary - signal file; `signal[(i*n_sig+j)*samps_per_frame[0]+k]` is sample - number `i*samps_per_frame[0]+k` of signal `j`. - - Notes - ----- - Converting the output array into "dat file order" here is inefficient, - but necessary to match the behavior of _rd_dat_file. It would be - better to reorganize _rd_dat_signals to make the reshaping unnecessary. - - """ - import soundfile - - if any(spf != samps_per_frame[0] for spf in samps_per_frame): - raise ValueError( - "All channels in a FLAC signal file must have the same " - "sampling rate and samples per frame" - ) - - if pn_dir is None: - file_name = os.path.join(dir_name, file_name) - - with _coreio._open_file(pn_dir, file_name, "rb") as fp: - return _rd_compressed_stream(fp, fmt, sample_offset, n_sig, samps_per_frame, start_frame, end_frame) - - def _skew_sig( sig, skew, n_sig, read_len, fmt, nan_replace, samps_per_frame=None ): diff --git a/wfdb/io/annotation.py b/wfdb/io/annotation.py index ae86cc9f..fdf63e71 100644 --- a/wfdb/io/annotation.py +++ b/wfdb/io/annotation.py @@ -8,6 +8,7 @@ from wfdb.io import download from wfdb.io import _header +from wfdb.io import _coreio from wfdb.io import record @@ -1883,7 +1884,6 @@ def rdann( pn_dir=None, return_label_elements=["symbol"], summarize_labels=False, - ann_stream=None, ): """ Read a WFDB annotation file record_name.extension and return an @@ -1947,11 +1947,38 @@ def rdann( sampfrom, sampto, return_label_elements ) + with _coreio._open_file( + pn_dir=pn_dir, + file_name=record_name + "." + extension, + mode="rb", + ) as fp: + annotation = _rdann(fp, sampfrom, sampto, shift_samps, return_label_elements, summarize_labels) + + # Try to get fs from the header file if it is not contained in the + # annotation file + if annotation.fs is None: + try: + rec = record.rdheader(record_name, pn_dir) + annotation.fs = rec.fs + except: + pass + + annotation.record_name = os.path.split(record_name)[1] + annotation.extension = extension + + return annotation + + +def _rdann( + buf, + sampfrom=0, + sampto=None, + shift_samps=False, + return_label_elements=["symbol"], + summarize_labels=False, +): # Read the file in byte pairs - if ann_stream is not None: - filebytes = np.frombuffer(ann_stream.read(), " str: - """ - Stream the text of a remote header file. - - Parameters - ---------- - file_name : str - The name of the headerr file to be read. - pn_dir : str - The PhysioNet database directory from which to find the - required header file. eg. For file '100.hea' in - 'http://physionet.org/content/mitdb', pn_dir='mitdb'. - - Returns - ------- - N/A : str - The text contained in the header file - - """ - # Full url of header location - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content of the remote file - with _url.openurl(url, "rb") as f: - content = f.read() - - return content.decode("iso-8859-1") - - def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): """ Stream data from a remote dat file into a 1d numpy array. @@ -148,36 +119,6 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): return sig_data -def _stream_annotation(file_name, pn_dir): - """ - Stream an entire remote annotation file from Physionet. - - Parameters - ---------- - file_name : str - The name of the annotation file to be read. - pn_dir : str - The PhysioNet directory where the annotation file is located. - - Returns - ------- - ann_data : ndarray - The resulting data stream in numpy array format. - - """ - # Full url of annotation file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - with _url.openurl(url, "rb") as f: - content = f.read() - - # Convert to numpy array - ann_data = np.fromstring(content, dtype=np.dtype(">> ecg_record = wfdb.rdheader('100', pn_dir='mitdb') """ - dir_name, base_record_name = os.path.split(record_name) - dir_name = os.path.abspath(dir_name) + # Construct the download path using the database version + if (pn_dir is not None) and ("." not in pn_dir): + dir_list = pn_dir.split("/") + pn_dir = posixpath.join( + dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] + ) - if hea_stream is not None: - header_content = hea_stream.read().decode('ascii') - else: - # Construct the download path using the database version - if (pn_dir is not None) and ("." not in pn_dir): - dir_list = pn_dir.split("/") - pn_dir = posixpath.join( - dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] - ) + # Read the local or remote header file. + file_name = f"{base_record_name}.hea" + with _coreio._open_file( + pn_dir=pn_dir, + file_name=file_name, + mode="rb", + ) as fp: + return _rdheader(fp, rd_segments) - # Read the local or remote header file. - file_name = f"{base_record_name}.hea" - if pn_dir is None: - with open( - os.path.join(dir_name, file_name), - "r", - encoding="ascii", - errors="ignore", - ) as f: - header_content = f.read() - else: - header_content = download._stream_header(file_name, pn_dir) + +def _rdheader(buf, rd_segments=False): + header_content = buf.read().decode('iso-8859-1') # Separate comment and non-comment lines header_lines, comment_lines = header.parse_header_content(header_content) @@ -1928,8 +1923,6 @@ def rdrecord( force_channels=True, channel_names=None, warn_empty=False, - hea_stream=None, - sig_stream=None, ): """ Read a WFDB record and return the signal and record descriptors as @@ -2031,7 +2024,7 @@ def rdrecord( dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] ) - record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False, hea_stream=hea_stream) + record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False) # Set defaults for sampto and channels input variables if sampto is None: @@ -2115,26 +2108,26 @@ def rdrecord( no_file = False sig_data = None - record.e_d_signal = _signal._rd_segment( - file_name=record.file_name, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=record.fmt, - n_sig=record.n_sig, - sig_len=record.sig_len, - byte_offset=record.byte_offset, - samps_per_frame=record.samps_per_frame, - skew=record.skew, - init_value=record.init_value, - sampfrom=sampfrom, - sampto=sampto, - channels=channels, - ignore_skew=ignore_skew, - no_file=no_file, - sig_data=sig_data, - sig_stream=sig_stream, - return_res=return_res, - ) + file_name = os.path.join(dir_name, file_name) + + with _coreio._open_file(pn_dir, file_name, "rb") as fp: + record.e_d_signal = _signal._rd_segment( + fp, + fmt=record.fmt, + n_sig=record.n_sig, + sig_len=record.sig_len, + byte_offset=record.byte_offset, + samps_per_frame=record.samps_per_frame, + skew=record.skew, + init_value=record.init_value, + sampfrom=sampfrom, + sampto=sampto, + channels=channels, + ignore_skew=ignore_skew, + no_file=no_file, + sig_data=sig_data, + return_res=return_res, + ) # Only 1 sample/frame, or frames are smoothed. Return uniform numpy array if smooth_frames: