Skip to content

Commit

Permalink
Merge branch 'develop' into feature/multiwave_generator
Browse files Browse the repository at this point in the history
  • Loading branch information
anupam-prasad committed Apr 23, 2021
2 parents 3334762 + ca556b4 commit e8c4a2c
Show file tree
Hide file tree
Showing 10 changed files with 21,736 additions and 111 deletions.
268 changes: 176 additions & 92 deletions agentMET4FOF/agents.py

Large diffs are not rendered by default.

138 changes: 123 additions & 15 deletions agentMET4FOF/metrological_agents.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Union
from typing import Dict, List, Tuple, Union

import numpy as np
import pandas as pd
import plotly.graph_objs as go
from time_series_buffer import TimeSeriesBuffer
from time_series_metadata.scheme import MetaData
Expand All @@ -10,27 +12,30 @@
)



class MetrologicalAgent(AgentMET4FOF):

_input_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, Dict]]]
"""Input dictionary of all incoming data including metadata
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}

"""Input dictionary of all incoming data including metadata::
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}
"""
_input_data_maxlen: int

_output_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, MetaData]]]
"""Output dictionary of all outgoing data including metadata
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}

"""Output dictionary of all outgoing data including metadata::
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}
"""
_output_data_maxlen: int

Expand Down Expand Up @@ -243,3 +248,106 @@ def agent_loop(self):
if self.current_state == "Running":
self.set_output_data(channel="default", data=self._stream.next_sample())
super().agent_loop()

class MetrologicalAgentBuffer(AgentBuffer):
"""Buffer class which is instantiated in every metrological agent to store data
This buffer is necessary to handle multiple inputs coming from agents.
We can access the buffer like a dict with exposed functions such as .values(),
.keys() and .items(). The actual dict object is stored in the attribute
:attr:`buffer <agentMET4FOF.agents.AgentBuffer.buffer>`. The list in
:attr:`supported_datatypes <agentMET4FOF.agents.AgentBuffer.supported_datatypes>`
contains one more element
for metrological agents, namely :class:`TimeSeriesBuffer
<time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>`.
"""
def __init__(self, buffer_size: int = 1000):
"""Initialise a new agent buffer object
Parameters
----------
buffer_size: int
Length of buffer allowed.
"""
super(MetrologicalAgentBuffer, self).__init__(buffer_size)
self.supported_datatypes.append(TimeSeriesBuffer)

def convert_single_to_tsbuffer(self, single_data: Union[List, Tuple, np.ndarray]):
"""Convert common data in agentMET4FOF to :class:`TimeSeriesBuffer
<time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>`
Parameters
----------
single_data : iterable of iterables (list, tuple, np.ndarrray) with shape (N, M)
* M==2 (pairs): assumed to be like (time, value)
* M==3 (triple): assumed to be like (time, value, value_unc)
* M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc)
Returns
-------
TimeSeriesBuffer
the new :class:`TimeSeriesBuffer
<time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object
"""
ts = TimeSeriesBuffer(maxlen=self.buffer_size)
ts.add(single_data)
return ts

def update(
self,
agent_from: str,
data: Union[Dict, List, Tuple, np.ndarray],
) -> TimeSeriesBuffer:
"""Overrides data in the buffer dict keyed by `agent_from` with value `data`
Parameters
----------
agent_from : str
Name of agent sender
data : dict or iterable of iterables (list, tuple, np.ndarray) with shape (N, M
the data to be stored in the metrological buffer
Returns
-------
TimeSeriesBuffer
the updated :class:`TimeSeriesBuffer
<time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object
"""
# handle if data type nested in dict
if isinstance(data, dict):
# check for each value datatype
for key, value in data.items():
data[key] = self.convert_single_to_tsbuffer(value)
else:
data = self.convert_single_to_tsbuffer(data)
self.buffer.update({agent_from: data})
return self.buffer

def _concatenate(
self,
iterable: TimeSeriesBuffer,
data: Union[np.ndarray, list, pd.DataFrame],
concat_axis: int = 0
) -> TimeSeriesBuffer:
"""Concatenate the given ``TimeSeriesBuffer`` with ``data``
Add ``data`` to the :class:`TimeSeriesBuffer
<time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object.
Parameters
----------
iterable : TimeSeriesBuffer
The current buffer to be concatenated with.
data : np.ndarray, DataFrame, list
New incoming data
Returns
-------
TimeSeriesBuffer
the original buffer with the data appended
"""
iterable.add(data)
return iterable
2 changes: 1 addition & 1 deletion agentMET4FOF_tutorials/buffering/basic_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#We show how we can read a single data from a sine generator, and store it in the agent buffer.
#When the buffer is filled up (set buffer_size to 5 entries), we send out the buffer content to the MonitorAgent, and empty the buffer to receive new data.

from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork, MonitorAgent, AgentBuffer
from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork, MonitorAgent
from agentMET4FOF.streams import SineGenerator

class BufferSineGeneratorAgent(AgentMET4FOF):
Expand Down
112 changes: 112 additions & 0 deletions agentMET4FOF_tutorials/buffering/metrological_buffering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from agentMET4FOF.agents import AgentNetwork
from agentMET4FOF.metrological_agents import (
MetrologicalAgent,
MetrologicalAgentBuffer,
MetrologicalMonitorAgent,
)
from agentMET4FOF.metrological_streams import (
MetrologicalDataStreamMET4FOF,
MetrologicalSineGenerator,
)


class MetrologicalSineGeneratorAgent(MetrologicalAgent):
"""An agent streaming a sine signal
Takes samples from an instance of :py:class:`MetrologicalSineGenerator` and pushes
them sample by sample to connected agents via its output channel.
"""

# The datatype of the stream will be MetrologicalSineGenerator.
_stream: MetrologicalDataStreamMET4FOF

def init_parameters(
self,
signal: MetrologicalDataStreamMET4FOF = MetrologicalSineGenerator(),
**kwargs
):
"""Initialize the input data stream
Parameters
----------
signal : MetrologicalDataStreamMET4FOF
the underlying signal for the generator
"""
self._stream = signal
super().init_parameters()
self.set_output_data(channel="default", metadata=self._stream.metadata)

def init_buffer(self, buffer_size):
"""
A method to initialise the buffer. By overriding this method, user can provide
a custom buffer, instead of the regular AgentBuffer. This can be used,
for example, to provide a MetrologicalAgentBuffer in the metrological agents.
"""
buffer = MetrologicalAgentBuffer(buffer_size)
return buffer

def agent_loop(self):
"""Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input
datastream's content and push it into its output buffer.
"""
if self.current_state == "Running":
metrological_sine_data = self._stream.next_sample()

# Equivalent to self.buffer_store but without logging.
self.buffer.store(agent_from=self.name, data=[metrological_sine_data])

# The actual dictionary is stored in self.buffer.buffer
self.log_info(str((self.buffer.buffer)))

# Check if buffer is filled up, then send out computed mean on the buffer
if self.buffer.buffer_filled(self.name):

# Access buffer content by accessing its key, it works like a
# dictionary. This is a TimeSeriesBuffer object.
time_series_buffer = self.buffer[self.name]

# np.ndarray of shape (self.buffer_size, 4)
buffer_content = time_series_buffer.pop(self.buffer_size)

# send out metrological data
self.set_output_data(channel="default", data=buffer_content)
super().agent_loop()

# clear buffer
self.buffer.clear(self.name)


def demonstrate_metrological_stream():

# start agent network server
agent_network = AgentNetwork(dashboard_modules=True, backend="mesa")

# Initialize signal generating class outside of agent framework.
signal = MetrologicalSineGenerator()

# Initialize metrologically enabled agent taking name from signal source metadata.
source_name = signal.metadata.metadata["device_id"]
source_agent = agent_network.add_agent(
name=source_name, agentType=MetrologicalSineGeneratorAgent, buffer_size=5
)
source_agent.init_parameters(signal)

# Initialize metrologically enabled plotting agent.
monitor_agent = agent_network.add_agent(
"MonitorAgent", agentType=MetrologicalMonitorAgent
)

# Bind agents.
source_agent.bind_output(monitor_agent)

# Set all agents states to "Running".
agent_network.set_running_state()

# Allow for shutting down the network after execution.
return agent_network


if __name__ == "__main__":
demonstrate_metrological_stream()
3 changes: 2 additions & 1 deletion agentMET4FOF_tutorials/buffering/moving_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def agent_loop(self):
if self.current_state == "Running":
sine_data = self._sine_stream.next_sample() # dictionary
sine_data["quantities"] = sine_data["quantities"] + np.random.rand(*sine_data["quantities"].shape)
self.send_output(sine_data["quantities"])
self.send_output(sine_data)

class RollingMeanAgent(AgentMET4FOF):
"""
Expand Down Expand Up @@ -65,6 +65,7 @@ def demonstrate_generator_agent_use():

# Initialize agents by adding them to the agent network.
gen_agent = agent_network.add_agent(agentType=NoisySineGeneratorAgent)
# the buffer size controls the window size of the moving average filter
fast_rolling_mean_agent = agent_network.add_agent(agentType=RollingMeanAgent, buffer_size=5)
slow_rolling_mean_agent = agent_network.add_agent(agentType=RollingMeanAgent, buffer_size=10)

Expand Down
4 changes: 2 additions & 2 deletions agentMET4FOF_tutorials/plotting/basic_memory_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def agent_loop(self):
sine_data = self._sine_stream.next_sample() # dictionary

if self.nested_output:
self.send_output({"Sensor1":sine_data["x"]*self.scaler,"Sensor2":sine_data["x"]*self.scaler+1.1})
self.send_output({"Sensor1":sine_data["quantities"]*self.scaler,"Sensor2":sine_data["quantities"]*self.scaler+1.1})
else:
self.send_output(sine_data["x"]*self.scaler)
self.send_output(sine_data["quantities"]*self.scaler)



Expand Down
19 changes: 19 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,25 @@
# writing this).


# This should make SciPy documentation available inside our docs.
intersphinx_mapping = {
"NumPy": ("https://numpy.org/doc/stable/", None),
"Pandas": ("http://pandas.pydata.org/pandas-docs/dev", None),
"SciPy": ("https://docs.scipy.org/doc/scipy/reference", None),
"PyDynamic": (
"https://pydynamic.readthedocs.io/en/latest/",
None,
),
"time-series-metadata": (
"https://time-series-metadata.readthedocs.io/en/latest/",
None,
),
"time-series-buffer": (
"https://time-series-buffer.readthedocs.io/en/latest/",
None,
),
}

nbsphinx_allow_errors = True

################################################################################
Expand Down
Loading

0 comments on commit e8c4a2c

Please sign in to comment.