Skip to content

Commit

Permalink
feat(multiwave): Added multiwave generator to metrological_streams.py
Browse files Browse the repository at this point in the history
  • Loading branch information
anupam-prasad committed Mar 11, 2021
1 parent cac3757 commit 035d7c9
Showing 1 changed file with 99 additions and 26 deletions.
125 changes: 99 additions & 26 deletions agentMET4FOF/metrological_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np
import pandas as pd
from scipy.stats import norm

from agentMET4FOF.streams import DataStreamMET4FOF

Expand Down Expand Up @@ -47,9 +48,9 @@ class MetrologicalDataStreamMET4FOF(DataStreamMET4FOF):
"""

def __init__(
self,
value_unc: Union[float, Iterable[float]] = 0.0,
time_unc: Union[float, Iterable[float]] = 0.0,
self,
value_unc: Union[float, Iterable[float]] = 0.0,
time_unc: Union[float, Iterable[float]] = 0.0,
):
"""Initialize a MetrologicalDataStreamMET4FOF object
Expand All @@ -67,11 +68,11 @@ def __init__(
self._time_unc: Union[float, Iterable[float]] = time_unc

def set_generator_function(
self,
generator_function: Callable = None,
uncertainty_generator: Callable = None,
sfreq: int = None,
**kwargs: Optional[Any]
self,
generator_function: Callable = None,
uncertainty_generator: Callable = None,
sfreq: int = None,
**kwargs: Optional[Any]
) -> Callable:
"""
Set value and uncertainty generators based on user-defined functions. By
Expand Down Expand Up @@ -124,9 +125,9 @@ def set_generator_function(
return self._generator_function_unc

def _default_uncertainty_generator(
self,
time: Union[List, pd.DataFrame, np.ndarray],
values: Union[List, pd.DataFrame, np.ndarray],
self,
time: Union[List, pd.DataFrame, np.ndarray],
values: Union[List, pd.DataFrame, np.ndarray],
) -> Tuple[np.ndarray, np.ndarray]:
"""Default (standard) uncertainty generator function
Expand Down Expand Up @@ -154,10 +155,10 @@ def _next_sample_generator(self, batch_size: int = 1) -> np.ndarray:
time uncertainty ``ut`` and measurement uncertainty ``uv`` to sample
"""
_time: np.ndarray = (
np.arange(self._sample_idx, self._sample_idx + batch_size, 1.0).reshape(
-1, 1
)
/ self.sfreq
np.arange(self._sample_idx, self._sample_idx + batch_size, 1.0).reshape(
-1, 1
)
/ self.sfreq
)
self._sample_idx += batch_size

Expand Down Expand Up @@ -219,17 +220,17 @@ class MetrologicalSineGenerator(MetrologicalDataStreamMET4FOF):
"""

def __init__(
self,
sfreq: int = 500,
sine_freq: float = 50,
device_id: str = "SineGenerator",
time_name: str = "time",
time_unit: str = "s",
quantity_names: Union[str, Tuple[str, ...]] = "Voltage",
quantity_units: Union[str, Tuple[str, ...]] = "V",
misc: Optional[Any] = "Simple sine wave generator",
value_unc: Union[float, Iterable[float]] = 0.1,
time_unc: Union[float, Iterable[float]] = 0,
self,
sfreq: int = 500,
sine_freq: float = 50,
device_id: str = "SineGenerator",
time_name: str = "time",
time_unit: str = "s",
quantity_names: Union[str, Tuple[str, ...]] = "Voltage",
quantity_units: Union[str, Tuple[str, ...]] = "V",
misc: Optional[Any] = "Simple sine wave generator",
value_unc: Union[float, Iterable[float]] = 0.1,
time_unc: Union[float, Iterable[float]] = 0,
):
super(MetrologicalSineGenerator, self).__init__(
value_unc=value_unc, time_unc=time_unc
Expand All @@ -256,3 +257,75 @@ def _sine_wave_function(self, time, sine_freq):
amplitude = np.sin(np.multiply(2 * np.pi * sine_freq, time))
amplitude += np.random.normal(0, self.value_unc, amplitude.shape)
return amplitude


class MetrologicalMultiWaveGenerator(MetrologicalDataStreamMET4FOF):
"""
Class to generate data as a sum of cosine wave and additional Gaussian noise.
Values with associated uncertainty are returned.
Parameters
----------
sfreq: float
sampling frequency which determines the time step when next_sample is called.
intercept: float
constant intercept of the signal
freq_arr: np.ndarray of float
array with frequencies of components included in the signal
ampl_arr: np.ndarray of float
array with amplitudes of components included in the signal
phase_ini_arr: np.ndarray of float
array with initial phases of components included in the signal
exp_unc_abs: float
absolute expanded uncertainty of each data point of the signal
"""

def __init__(
self,
sfreq: int = 500,
freq_arr: np.array = np.array([50]),
ampl_arr: np.array = np.array([1]),
phase_ini_arr: np.array = np.array([0]),
intercept: float = 0,
exp_unc_abs: float = 0.1,
device_id: str = "DataGenerator",
time_name: str = "time",
time_unit: str = "s",
quantity_names: Union[str, Tuple[str, ...]] = ("Length", "Mass"),
quantity_units: Union[str, Tuple[str, ...]] = ("m", "kg"),
misc: Optional[Any] = "data generator",
value_unc: Union[float, Iterable[float]] = 0.1,
time_unc: Union[float, Iterable[float]] = 0,
):
super(MetrologicalMultiWaveGenerator, self).__init__(
value_unc=value_unc, time_unc=time_unc
)
self.set_metadata(
device_id=device_id,
time_name=time_name,
time_unit=time_unit,
quantity_names=quantity_names,
quantity_units=quantity_units,
misc=misc
)
self.set_generator_function(
generator_function=self._multi_wave_function,
sfreq=sfreq,
intercept=intercept,
freq_arr=freq_arr,
ampl_arr=ampl_arr,
phase_ini_arr=phase_ini_arr,
exp_unc_abs=exp_unc_abs
)

def _multi_wave_function(self, time_arr, intercept, freq_arr, ampl_arr,
phase_ini_arr, exp_unc_abs=0.1):

value_arr = intercept + exp_unc_abs / 2 * norm.rvs(size=time_arr.shape)

for ampl, freq, phase_ini in zip(freq_arr, ampl_arr, phase_ini_arr):
value_arr = value_arr + ampl * np.cos(2 * np.pi * freq * time_arr + phase_ini)

value_expunc_arr = exp_unc_abs * np.ones_like(value_arr)

return value_arr, value_expunc_arr

0 comments on commit 035d7c9

Please sign in to comment.