From b6ca636fc7eeae693819bd82893328d3e838fdd0 Mon Sep 17 00:00:00 2001 From: Bjoern Ludwig Date: Fri, 30 Jul 2021 15:07:34 +0200 Subject: [PATCH] feat(SineWithJitter): introduce a streaming data stream of a sine signal with jitter and the corresponding agent --- agentMET4FOF/agents/signal_agents.py | 61 +++++++++++++- agentMET4FOF/streams/signal_streams.py | 80 ++++++++++++++++++- .../noise_jitter/generate_sine_with_jitter.py | 10 ++- 3 files changed, 147 insertions(+), 4 deletions(-) diff --git a/agentMET4FOF/agents/signal_agents.py b/agentMET4FOF/agents/signal_agents.py index 060b5f64..809e263c 100644 --- a/agentMET4FOF/agents/signal_agents.py +++ b/agentMET4FOF/agents/signal_agents.py @@ -4,7 +4,11 @@ import pandas as pd from .base_agents import AgentMET4FOF -from ..streams.signal_streams import SineGenerator, StaticSineWithJitterGenerator +from ..streams.signal_streams import ( + SineGenerator, + SineWithJitterGenerator, + StaticSineWithJitterGenerator, +) __all__ = ["SineGeneratorAgent", "StaticSineWithJitterGeneratorAgent", "NoiseAgent"] @@ -92,6 +96,7 @@ def agent_loop(self): class NoiseAgent(AgentMET4FOF): """An agent adding white noise to the incoming signal""" + _noise_std: float @property @@ -155,3 +160,57 @@ def _compute_noisy_signal_from_clean_signal( data_in_message["quantities"] ) self.send_output(fully_assembled_resulting_data) + + +class SineWithJitterGeneratorAgent(SineGeneratorAgent): + """An agent streaming a sine signal + + Takes samples from the :py:mod:`SineWithJitterGenerator` and pushes them sample by + sample to connected agents via its output channel. + """ + + def init_parameters( + self, + sfreq: Optional[int] = 10, + sine_freq: Optional[float] = np.reciprocal(2 * np.pi), + amplitude: Optional[float] = 1.0, + initial_phase: Optional[float] = 0.0, + jitter_std: Optional[float] = 0.02, + ): + """Initialize the input data + + Initialize the input data stream as an instance of the + :class:`SineWithJitterGenerator` class. + + Parameters + ---------- + sfreq : int, optional + sampling frequency which determines the time step when :meth:`.next_sample` + is called, defaults to 10 + sine_freq : float, optional + frequency of the generated sine wave, defaults to :math:`\frac{1}{2 \pi}` + amplitude : float, optional + amplitude of the generated sine wave, defaults to 1.0 + initial_phase : float, optional + initial phase (at t=0) of the generated sine wave, defaults to 0.0 + jitter_std : float, optional + the standard deviation of the distribution to randomly draw jitter from, + defaults to 0.02 + """ + self._sine_stream = SineWithJitterGenerator( + sfreq=sfreq, + sine_freq=sine_freq, + amplitude=amplitude, + initial_phase=initial_phase, + jitter_std=jitter_std, + ) + + def agent_loop(self): + """Model the agent's behaviour + + On state *Running* the agent will extract sample by sample the input data + streams content and push it via invoking :meth:`AgentMET4FOF.send_output`. + """ + if self.current_state == "Running": + sine_data = self._sine_stream.next_sample() + self.send_output(sine_data) diff --git a/agentMET4FOF/streams/signal_streams.py b/agentMET4FOF/streams/signal_streams.py index 20192f8b..5caeb348 100644 --- a/agentMET4FOF/streams/signal_streams.py +++ b/agentMET4FOF/streams/signal_streams.py @@ -1,10 +1,15 @@ -from typing import Optional +from typing import Dict, Optional import numpy as np from .base_streams import DataStreamMET4FOF -__all__ = ["SineGenerator", "CosineGenerator", "StaticSineWithJitterGenerator"] +__all__ = [ + "SineGenerator", + "CosineGenerator", + "SineWithJitterGenerator", + "StaticSineWithJitterGenerator", +] class SineGenerator(DataStreamMET4FOF): @@ -151,3 +156,74 @@ def __init__(self, num_cycles=1000, jitter_std=0.02): timestamps_with_jitter = np.random.normal(loc=timestamps, scale=jitter_std) signal_values_at_timestamps = np.sin(timestamps_with_jitter) self.set_data_source(quantities=signal_values_at_timestamps, time=timestamps) + + +class SineWithJitterGenerator(SineGenerator): + r"""Represents a streamed sine signal with jitter + + Parameters + ---------- + sfreq : int, optional + sampling frequency which determines the time step when :meth:`.next_sample` + is called, defaults to 10 + sine_freq : float, optional + frequency of wave function, defaults to :math:`\frac{1}{2 \pi}` + amplitude : float, optional + amplitude of the wave function, defaults to 1.0 + initial_phase : float, optional + initial phase of the wave function, defaults to 0.0 + jitter_std : float, optional + the standard deviation of the distribution to randomly draw jitter from, + defaults to 0.02 + """ + + _jitter_std: float + + @property + def jitter_std(self): + """The standard deviation of the distribution to randomly draw jitter from""" + return self._jitter_std + + def __init__( + self, + sfreq: Optional[int] = 10, + sine_freq: Optional[float] = np.reciprocal(2 * np.pi), + amplitude: Optional[float] = 1.0, + initial_phase: Optional[float] = 0.0, + jitter_std: Optional[float] = 0.02, + ): + self._jitter_std = jitter_std + super().__init__( + sfreq=sfreq, + sine_freq=sine_freq, + amplitude=amplitude, + initial_phase=initial_phase, + ) + + def _next_sample_generator( + self, batch_size: Optional[int] = 1 + ) -> Dict[str, np.ndarray]: + """Generate the next batch of samples from the sine function with jitter + + Parameters + ---------- + batch_size : int, optional + number of batches to get from data stream, defaults to 1 + + Returns + ------- + Dict[str, np.ndarray] + latest samples of the sine signal with jitter in the form:: + + dict like { + "quantities":