Skip to content

Commit

Permalink
feat(SineWithJitter): introduce a streaming data stream of a sine sig…
Browse files Browse the repository at this point in the history
…nal with jitter and the corresponding agent
  • Loading branch information
BjoernLudwigPTB committed Jul 30, 2021
1 parent ef407fc commit b6ca636
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 4 deletions.
61 changes: 60 additions & 1 deletion agentMET4FOF/agents/signal_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import pandas as pd

from .base_agents import AgentMET4FOF
from ..streams.signal_streams import SineGenerator, StaticSineWithJitterGenerator
from ..streams.signal_streams import (
SineGenerator,
SineWithJitterGenerator,
StaticSineWithJitterGenerator,
)

__all__ = ["SineGeneratorAgent", "StaticSineWithJitterGeneratorAgent", "NoiseAgent"]

Expand Down Expand Up @@ -92,6 +96,7 @@ def agent_loop(self):

class NoiseAgent(AgentMET4FOF):
"""An agent adding white noise to the incoming signal"""

_noise_std: float

@property
Expand Down Expand Up @@ -155,3 +160,57 @@ def _compute_noisy_signal_from_clean_signal(
data_in_message["quantities"]
)
self.send_output(fully_assembled_resulting_data)


class SineWithJitterGeneratorAgent(SineGeneratorAgent):
"""An agent streaming a sine signal
Takes samples from the :py:mod:`SineWithJitterGenerator` and pushes them sample by
sample to connected agents via its output channel.
"""

def init_parameters(
self,
sfreq: Optional[int] = 10,
sine_freq: Optional[float] = np.reciprocal(2 * np.pi),
amplitude: Optional[float] = 1.0,
initial_phase: Optional[float] = 0.0,
jitter_std: Optional[float] = 0.02,
):
"""Initialize the input data
Initialize the input data stream as an instance of the
:class:`SineWithJitterGenerator` class.
Parameters
----------
sfreq : int, optional
sampling frequency which determines the time step when :meth:`.next_sample`
is called, defaults to 10
sine_freq : float, optional
frequency of the generated sine wave, defaults to :math:`\frac{1}{2 \pi}`
amplitude : float, optional
amplitude of the generated sine wave, defaults to 1.0
initial_phase : float, optional
initial phase (at t=0) of the generated sine wave, defaults to 0.0
jitter_std : float, optional
the standard deviation of the distribution to randomly draw jitter from,
defaults to 0.02
"""
self._sine_stream = SineWithJitterGenerator(
sfreq=sfreq,
sine_freq=sine_freq,
amplitude=amplitude,
initial_phase=initial_phase,
jitter_std=jitter_std,
)

def agent_loop(self):
"""Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input data
streams content and push it via invoking :meth:`AgentMET4FOF.send_output`.
"""
if self.current_state == "Running":
sine_data = self._sine_stream.next_sample()
self.send_output(sine_data)
80 changes: 78 additions & 2 deletions agentMET4FOF/streams/signal_streams.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Optional
from typing import Dict, Optional

import numpy as np

from .base_streams import DataStreamMET4FOF

__all__ = ["SineGenerator", "CosineGenerator", "StaticSineWithJitterGenerator"]
__all__ = [
"SineGenerator",
"CosineGenerator",
"SineWithJitterGenerator",
"StaticSineWithJitterGenerator",
]


class SineGenerator(DataStreamMET4FOF):
Expand Down Expand Up @@ -151,3 +156,74 @@ def __init__(self, num_cycles=1000, jitter_std=0.02):
timestamps_with_jitter = np.random.normal(loc=timestamps, scale=jitter_std)
signal_values_at_timestamps = np.sin(timestamps_with_jitter)
self.set_data_source(quantities=signal_values_at_timestamps, time=timestamps)


class SineWithJitterGenerator(SineGenerator):
r"""Represents a streamed sine signal with jitter
Parameters
----------
sfreq : int, optional
sampling frequency which determines the time step when :meth:`.next_sample`
is called, defaults to 10
sine_freq : float, optional
frequency of wave function, defaults to :math:`\frac{1}{2 \pi}`
amplitude : float, optional
amplitude of the wave function, defaults to 1.0
initial_phase : float, optional
initial phase of the wave function, defaults to 0.0
jitter_std : float, optional
the standard deviation of the distribution to randomly draw jitter from,
defaults to 0.02
"""

_jitter_std: float

@property
def jitter_std(self):
"""The standard deviation of the distribution to randomly draw jitter from"""
return self._jitter_std

def __init__(
self,
sfreq: Optional[int] = 10,
sine_freq: Optional[float] = np.reciprocal(2 * np.pi),
amplitude: Optional[float] = 1.0,
initial_phase: Optional[float] = 0.0,
jitter_std: Optional[float] = 0.02,
):
self._jitter_std = jitter_std
super().__init__(
sfreq=sfreq,
sine_freq=sine_freq,
amplitude=amplitude,
initial_phase=initial_phase,
)

def _next_sample_generator(
self, batch_size: Optional[int] = 1
) -> Dict[str, np.ndarray]:
"""Generate the next batch of samples from the sine function with jitter
Parameters
----------
batch_size : int, optional
number of batches to get from data stream, defaults to 1
Returns
-------
Dict[str, np.ndarray]
latest samples of the sine signal with jitter in the form::
dict like {
"quantities": <time series data as np.ndarray>,
"time": <time stamps as np.ndarray>
}
"""
sine_signal_with_time_stamps = super()._next_sample_generator()

sine_signal_with_time_stamps["time"] = np.random.normal(
loc=sine_signal_with_time_stamps["time"], scale=self.jitter_std
)

return sine_signal_with_time_stamps
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from agentMET4FOF.agents import MonitorAgent
from agentMET4FOF.agents.signal_agents import (
SineGeneratorAgent,
SineWithJitterGeneratorAgent,
StaticSineWithJitterGeneratorAgent,
)
from agentMET4FOF.network import AgentNetwork
Expand All @@ -18,15 +19,22 @@ def demonstrate_sine_with_jitter_agent_use():
sfreq=10,
sine_freq=np.reciprocal(2 * np.pi),
)
static_jitter_agent = agent_network.add_agent(
name="Static Sine signal with jitter",
agentType=StaticSineWithJitterGeneratorAgent,
)
static_jitter_agent.init_parameters(jitter_std=0.05)
jitter_agent = agent_network.add_agent(
name="Sine signal with jitter", agentType=StaticSineWithJitterGeneratorAgent
name="Streaming Sine signal with jitter",
agentType=SineWithJitterGeneratorAgent,
)
jitter_agent.init_parameters(jitter_std=0.05)
monitor_agent = agent_network.add_agent(
name="Compare clean signal and signal with jitter", agentType=MonitorAgent
)

agent_network.bind_agents(sine_agent, monitor_agent)
agent_network.bind_agents(static_jitter_agent, monitor_agent)
agent_network.bind_agents(jitter_agent, monitor_agent)

agent_network.set_running_state()
Expand Down

0 comments on commit b6ca636

Please sign in to comment.