From 682a5ff2379c36a57ffea610aa7f113489e72e95 Mon Sep 17 00:00:00 2001 From: Anupam Prasad Vedurmudi Date: Mon, 15 Mar 2021 15:04:31 +0100 Subject: [PATCH] feat(multiwave_generator): multiwave generator added to metrological_streams.py --- agentMET4FOF/metrological_agents.py | 12 +---- agentMET4FOF/metrological_streams.py | 75 +++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/agentMET4FOF/metrological_agents.py b/agentMET4FOF/metrological_agents.py index c5b34748..812ec384 100644 --- a/agentMET4FOF/metrological_agents.py +++ b/agentMET4FOF/metrological_agents.py @@ -3,24 +3,14 @@ import plotly.graph_objs as go from time_series_buffer import TimeSeriesBuffer from time_series_metadata.scheme import MetaData -import numpy as np from agentMET4FOF.agents import AgentMET4FOF class MetrologicalAgent(AgentMET4FOF): - # dict like { - # : { - # "buffer": TimeSeriesBuffer(maxlen=buffer_size), - # "metadata": MetaData(**kwargs).metadata, - # } + _input_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, Dict]]] _input_data_maxlen: int - # dict like { - # : { - # "buffer" : TimeSeriesBuffer(maxlen=buffer_size), - # "metadata" : MetaData(**kwargs) - # } _output_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, MetaData]]] _output_data_maxlen: int diff --git a/agentMET4FOF/metrological_streams.py b/agentMET4FOF/metrological_streams.py index 5aa07a41..c03429a3 100644 --- a/agentMET4FOF/metrological_streams.py +++ b/agentMET4FOF/metrological_streams.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd +from scipy.stats import norm from agentMET4FOF.streams import DataStreamMET4FOF @@ -116,7 +117,7 @@ def set_generator_function( if uncertainty_generator is None: warnings.warn( "No uncertainty generator function specified. Setting to default (" - "zero)." + "constant)." ) self._generator_function_unc = self._default_uncertainty_generator else: @@ -256,3 +257,75 @@ def _sine_wave_function(self, time, sine_freq): amplitude = np.sin(np.multiply(2 * np.pi * sine_freq, time)) amplitude += np.random.normal(0, self.value_unc, amplitude.shape) return amplitude + + +class MetrologicalMultiWaveGenerator(MetrologicalDataStreamMET4FOF): + """ + Class to generate data as a sum of cosine wave and additional Gaussian noise. + Values with associated uncertainty are returned. + + Parameters + ---------- + sfreq: float + sampling frequency which determines the time step when next_sample is called. + intercept: float + constant intercept of the signal + freq_arr: np.ndarray of float + array with frequencies of components included in the signal + ampl_arr: np.ndarray of float + array with amplitudes of components included in the signal + phase_ini_arr: np.ndarray of float + array with initial phases of components included in the signal + """ + + def __init__( + self, + sfreq: int = 500, + freq_arr: np.array = np.array([50]), + ampl_arr: np.array = np.array([1]), + phase_ini_arr: np.array = np.array([0]), + intercept: float = 0, + device_id: str = "DataGenerator", + time_name: str = "time", + time_unit: str = "s", + quantity_names: Union[str, Tuple[str, ...]] = ("Length", "Mass"), + quantity_units: Union[str, Tuple[str, ...]] = ("m", "kg"), + misc: Optional[Any] = " Generator for a linear sum of cosines", + value_unc: Union[float, Iterable[float]] = 0.1, + time_unc: Union[float, Iterable[float]] = 0, + noisy: bool = True + ): + super(MetrologicalMultiWaveGenerator, self).__init__( + value_unc=value_unc, time_unc=time_unc + ) + self.set_metadata( + device_id=device_id, + time_name=time_name, + time_unit=time_unit, + quantity_names=quantity_names, + quantity_units=quantity_units, + misc=misc + ) + self.value_unc = value_unc + self.time_unc = time_unc + self.set_generator_function( + generator_function=self._multi_wave_function, + sfreq=sfreq, + intercept=intercept, + freq_arr=freq_arr, + ampl_arr=ampl_arr, + phase_ini_arr=phase_ini_arr, + noisy=noisy + ) + + def _multi_wave_function(self, time, intercept, freq_arr, ampl_arr, + phase_ini_arr, noisy): + + value_arr = intercept + if noisy: + value_arr += self.value_unc / 2 * norm.rvs(size=time.shape) + + for ampl, freq, phase_ini in zip(freq_arr, ampl_arr, phase_ini_arr): + value_arr = value_arr + ampl * np.cos(2 * np.pi * freq * time + phase_ini) + + return value_arr