diff --git a/agentMET4FOF/exceptions.py b/agentMET4FOF/exceptions.py new file mode 100644 index 00000000..2e274542 --- /dev/null +++ b/agentMET4FOF/exceptions.py @@ -0,0 +1,19 @@ +class SensorsNotLinearlyIndependentError(Exception): + """ + Custom exception to handle the case when sensor results are not linearly independent + """ + pass + + +class SystemMatrixNotReducibleError(Exception): + """ + Custom exception to handle the case when the system matrix *A* is not reducible + """ + pass + + +class ColumnNotZeroError(Exception): + """ + Custom exception to handle the case when a redundant column has not been reduced to zero + """ + pass \ No newline at end of file diff --git a/agentMET4FOF/metrological_agents.py b/agentMET4FOF/metrological_agents.py index d42a4a1b..c83599be 100644 --- a/agentMET4FOF/metrological_agents.py +++ b/agentMET4FOF/metrological_agents.py @@ -5,15 +5,19 @@ import plotly.graph_objs as go from time_series_buffer import TimeSeriesBuffer from time_series_metadata.scheme import MetaData +from itertools import combinations +from scipy.special import comb +from scipy.stats import chi2 -from agentMET4FOF.agents import AgentBuffer, AgentMET4FOF +from .agents import AgentBuffer, AgentMET4FOF from .metrological_streams import ( MetrologicalDataStreamMET4FOF, MetrologicalSineGenerator, ) +from .exceptions import * -class MetrologicalAgent(AgentMET4FOF): +class MetrologicalAgent(AgentMET4FOF): _input_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, Dict]]] """Input dictionary of all incoming data including metadata:: @@ -126,7 +130,6 @@ def init_parameters(self, *args, **kwargs): self.plots = {} self.custom_plot_parameters = {} - def on_received_message(self, message): """ Handles incoming data from 'default' and 'plot' channels. @@ -142,7 +145,8 @@ def on_received_message(self, message): if self.plot_filter != []: message['data'] = {key: message['data'][key] for key in self.plot_filter} message['metadata'] = {key: message['metadata'][key] for key in self.plot_filter} - self.buffer_store(agent_from=message["from"], data={"data": message["data"], "metadata": message["metadata"]}) + self.buffer_store(agent_from=message["from"], + data={"data": message["data"], "metadata": message["metadata"]}) elif message['channel'] == 'plot': self.update_plot_memory(message) return 0 @@ -219,6 +223,7 @@ class MetrologicalAgentBuffer(AgentBuffer): for metrological agents, namely :class:`TimeSeriesBuffer `. """ + def __init__(self, buffer_size: int = 1000): """Initialise a new agent buffer object @@ -284,10 +289,10 @@ def update( return self.buffer def _concatenate( - self, - iterable: TimeSeriesBuffer, - data: Union[np.ndarray, list, pd.DataFrame], - concat_axis: int = 0 + self, + iterable: TimeSeriesBuffer, + data: Union[np.ndarray, list, pd.DataFrame], + concat_axis: int = 0 ) -> TimeSeriesBuffer: """Concatenate the given ``TimeSeriesBuffer`` with ``data`` @@ -309,6 +314,7 @@ def _concatenate( iterable.add(data) return iterable + class MetrologicalGeneratorAgent(MetrologicalAgent): """An agent streaming a specified signal @@ -320,9 +326,9 @@ class MetrologicalGeneratorAgent(MetrologicalAgent): _stream: MetrologicalDataStreamMET4FOF def init_parameters( - self, - signal: MetrologicalDataStreamMET4FOF = MetrologicalSineGenerator(), - **kwargs + self, + signal: MetrologicalDataStreamMET4FOF = MetrologicalSineGenerator(), + **kwargs ): """Initialize the input data stream @@ -347,4 +353,833 @@ def agent_loop(self): """ if self.current_state == "Running": self.set_output_data(channel="default", data=self._stream.next_sample()) - super().agent_loop() \ No newline at end of file + super().agent_loop() + + +class RedundancyAgent(MetrologicalAgent): + """ + This is the main Redundancy Agent class. Main calculation types are :py:func:`lcs` and :py:func:`lcss`, as defined + in the module :mod:`redundancy1`. + """ + + def init_parameters( + self, + input_data_maxlen: int = 25, + output_data_maxlen: int = 25, + sensor_key_list: list = None, + n_pr: int = 1, + problim: float = .9, + calc_type: str = 'lcs' + ): + """ + Initialize the redundancy agent as an instance of the :py:mod:`MetrologicalAgent` class. + + + Parameters + ---------- + n_pr : int, optional + size of the batch of data that is handled at a time by the Redundancy Agent. Defaults to 1 + problim : float, optional + limit probability used for consistency evaluation. Defaults to .9 + calc_type : str, optional + calculation type: 'lcs' or 'lcss'. Defaults to 'lcs' + sensor_key_list : list of str + list containing the names of the sensors that should feed data to the Redundancy Agent. Defaults to None + + Parent class parameters + ---------- + input_data_maxlen: int + + output_data_maxlen: int + + """ + + if sensor_key_list is None: + sensor_key_list = [] + super().init_parameters(input_data_maxlen=25, output_data_maxlen=25) + self.metadata = MetaData( + device_id="RedAgent01", + time_name="time", + time_unit="s", + quantity_names="m", + quantity_units="kg", + misc="nothing") + + self.calc_type = calc_type + self.sensor_key_list = sensor_key_list + self.n_pr = n_pr + self.problim = problim + + self.set_output_data(channel="default", metadata=self.metadata) + + def init_lcss_parameters(self, fsam, f1, f2, ampl_ratio, phi1, phi2): + """ + Additional parameters used for this particular example in combination with the :py:func:`lcss` method. + It provides the prior knowledge needed to make the information contained in the data redundant. + This method sets up the vector **a** and matrix *A* for the system **y** = **a** + *A* * **x**. + + Parameters + ---------- + fsam : float + sampling frequency + f1 : float + first frequency of interest in signal + f2 : float + second frequency of interest in signal + ampl_ratio : float + ratio of the amplitudes of the two frequency components + phi1 : float + initial phase of first frequency component + phi2 : float + initial phase of second frequency component + """ + # set-up vector a_arr and matrix a_arr2d for redundancy method + id_mat = np.identity(self.n_pr) + id_fft = np.fft.fft(id_mat) + + n_pr_floor = self.n_pr // 2 + bfft = id_fft[:n_pr_floor, :].real / n_pr_floor + cfft = -id_fft[:n_pr_floor, :].imag / n_pr_floor + + c1 = 1 / np.cos(phi1) + c2 = 1 / np.sin(-phi1) + c3 = ampl_ratio / np.cos(phi2) + c4 = ampl_ratio / np.sin(-phi2) + + ind_freq1 = f1 * self.n_pr // fsam + ind_freq2 = f2 * self.n_pr // fsam + + a_row1 = c1 * bfft[ind_freq1, :].reshape((1, -1)) + a_row2 = c2 * cfft[ind_freq1, :].reshape((1, -1)) + a_row3 = c3 * bfft[ind_freq2, :].reshape((1, -1)) + a_row4 = c4 * cfft[ind_freq2, :].reshape((1, -1)) + + self.a_arr2d = np.concatenate((a_row1, a_row2, a_row3, a_row4), axis=0) + """a_arr : np.ndarray of float""" + self.a_arr = np.zeros(shape=(4, 1)) + """a_arr2d : np.ndarray of float""" + + def agent_loop(self): + """ + Model the agent's behaviour + On state *Running* the agent will extract sample by sample the input data + streams content and push it via invoking :py:func:`AgentMET4FOF.send_output`. + """ + if self.current_state == "Running": + # sometimes the buffer does not contain values for all sensors + # sensor_key_list = ["Sensor1", "Sensor2"] + key_list = [key for key in self.sensor_key_list if key in self.buffer.keys()] + n_sensors = len(key_list) + if n_sensors != len(self.sensor_key_list): # expected number of sensors + print('Not all sensors were present in the buffer. Not evaluating the data.') + return 0 + + for key in key_list: + if self.buffer[key].shape[0] < self.n_pr: + print('Buffer size is ', self.buffer[key].shape[0], ', which is less than ', self.n_pr, '.') + print('Not enough data for redundancy agent evaluation.') + return 0 + + buff = self.buffer.popleft(self.n_pr) # take n_pr entries out from the buffer + + t_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) + ut_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) + x_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) + ux_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) + i_sensor = 0 + # for key in buff.keys(): # arbitrary order + + for key in key_list: + data_arr = buff[key] + t_data_arr2d[:, i_sensor] = data_arr[:, 0] + ut_data_arr2d[:, i_sensor] = data_arr[:, 1] + x_data_arr2d[:, i_sensor] = data_arr[:, 2] + ux_data_arr2d[:, i_sensor] = data_arr[:, 3] + i_sensor = i_sensor + 1 + + if self.calc_type == "lcs": + data = np.full(shape=(self.n_pr, 4), fill_value=np.nan) + for i_pnt in range(self.n_pr): + y_arr = np.array(x_data_arr2d[i_pnt, :]) + y_arr = y_arr.reshape((n_sensors, 1)) + vy_arr2d = np.zeros(shape=(n_sensors, n_sensors)) + for i_sensor in range(n_sensors): + vy_arr2d[i_sensor, i_sensor] = np.square(ux_data_arr2d[i_pnt, i_sensor]) + + n_sols, ybest, uybest, chi2obs, indkeep = self.calc_lcs(y_arr, vy_arr2d, self.problim) + if n_sols == 1: # time stamp is value of first sensor + if isinstance(ybest, np.ndarray): + ybest = ybest[0] + data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest, uybest]) + else: # only return the first solution + data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest[0], uybest[0]]) + elif self.calc_type == "lcss": + # lcss applied to one data vector (required input) + # Sum the signals to get one signal + x_data_arr = np.sum(x_data_arr2d, axis=1) + x_data_arr = x_data_arr.reshape((len(x_data_arr), 1)) + ux2_data_arr = np.sum(np.square(ux_data_arr2d), axis=1) + vx_arr2d = np.zeros((self.n_pr, self.n_pr)) + for i_val in range(self.n_pr): + vx_arr2d[i_val, i_val] = ux2_data_arr[i_val] + + n_sols, ybest, uybest, chi2obs, indkeep = \ + self.calc_lcss(self.a_arr, self.a_arr2d, x_data_arr, vx_arr2d, self.problim) + print('calc lcss finished') + print('n_sols: ', n_sols) + print('ybest: ', ybest) + print('uybest: ', uybest) + if n_sols == 1: # time stamp is latest value + if isinstance(ybest, np.ndarray): + ybest = ybest[0] + data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest, uybest]) + else: # only return the first solution + data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest[0], uybest[0]]) + + # Send the data + if len(data.shape) == 1: + data = data.reshape((1, len(data))) + + self.set_output_data(channel="default", data=data) + super().agent_loop() + + def calc_consistent_estimates_no_corr(y_arr2d, uy_arr2d, prob_lim): + """ + Calculation of consistent estimate for n_sets of estimates y_ij (contained in + y_arr2d) of a quantity Y, where each set contains n_estims estimates. + The uncertainties are assumed to be independent and given in uy_arr2d. + The consistency test is using limit probability limit prob_lim. + For each set of estimates, the best estimate, uncertainty, + observed chi-2 value and a flag if the + provided estimates were consistent given the model are given as output. + + Parameters + ---------- + y_arr2d: np.ndarray of size (n_rows, n_estimates) + each row contains m=n_estimates independent estimates of a measurand + uy_arr2d: np.ndarray of size (n_rows, n_estimates) + each row contains the standard uncertainty u(y_ij) of y_ij = y_arr2d[i,j] + prob_lim: limit probability used in consistency test. Typically 0.95. + + Returns + ------- + isconsist_arr: bool array of shape (n_rows) + indicates for each row if the n_estimates are consistent or not + ybest_arr: np.ndarray of shape (n_rows) + contains the best estimate for each row of individual estimates + uybest_arr: np.ndarray of shape (n_rows) + contains the uncertainty associated with each best estimate for each row of *y_arr2d* + chi2obs_arr: observed chi-squared value for each row + + """ + + if len(y_arr2d.shape) > 1: + n_sets = y_arr2d.shape[0] + else: + n_sets = 1 + + n_estims = y_arr2d.shape[-1] # last dimension is number of estimates + chi2_lim = chi2.ppf(1 - prob_lim, n_estims - 1) + uy2inv_arr2d = 1 / np.power(uy_arr2d, 2) + uy2best_arr = 1 / np.sum(uy2inv_arr2d, -1) + uybest_arr = np.sqrt(uy2best_arr) + ybest_arr = np.sum(y_arr2d * uy2inv_arr2d, -1) * uy2best_arr + + if n_sets > 1: + ybest_arr = ybest_arr.reshape(n_sets, 1) # make a column vector of ybest_arr + + chi2obs_arr = np.sum(np.power((y_arr2d - np.broadcast_to(ybest_arr, (n_sets, n_estims))) / uy_arr2d, 2), -1) + isconsist_arr = (chi2obs_arr <= chi2_lim) + + return isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr + + def print_output_single(self, isconsist, ybest, uybest, chi2obs): + """ + Function to print the output of a single row of the calculate_best_estimate function. + + Parameters + ---------- + isconsist: bool + Indicates if provided estimates were consistent + ybest: float + best estimate + uybest: float + uncertainty of best estimate + chi2obs: float + observed value of chi-squared + """ + print('\tThe observed chi-2 value is %3.3f.' % chi2obs) + + if isconsist: + print("\tThe provided estimates (input) were consistent.") + else: + print("\tThe provided estimates (input) were not consistent.") + + print(f"\tThe best estimate is {ybest:3.3f} with uncertainty {uybest:3.3f}.\n") + + def print_output_cbe(self, isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr): + """ + Function to print the full output of calc_best_estimate. + + Parameters + ---------- + isconsist_arr: bool array of shape (n_rows) + indicates for each row if the n_estimates are consistent or not + ybest_arr: np.ndarray of shape (n_rows) + contains the best estimate for each row of individual estimates + uybest_arr: np.ndarray of shape (n_rows) + contains the uncertainty associated with each best estimate for each row of *y_arr2d* + chi2obs_arr: observed chi-squared value for each row + + Returns + ------- + + """ + if len(ybest_arr.shape) == 0: + self.print_output_single(isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr) + else: + n_sets = ybest_arr.shape[0] + print(f'There are {n_sets:.0f} sets with estimates of the measurand.') + for i_set in range(n_sets): + print(f'The result of set {i_set:.0f} is:') + self.print_output_single(isconsist_arr.item(i_set), ybest_arr.item(i_set), uybest_arr.item(i_set), + chi2obs_arr.item(i_set)) + + def calc_best_estimate(self, y_arr, vy_arr2d, problim): + """Calculate the best estimate for a set of estimates with associated uncertainty matrix, + and determine if the set of estimates are consistent using a provided limit probability. + + Parameters + ---------- + y_arr: np.ndarray of shape (n) + vector of estimates of a measurand Y + vy_arr2d: np.ndarray of shape (n, n) + uncertainty matrix associated with y_arr + problim: float + probability limit used for assessing the consistency of the estimates. Typically, problim equals 0.95. + + Returns + ------- + isconsist: bool + indicator whether provided estimates are consistent in view of *problim* + ybest: float + best estimate of measurand + uybest: float + uncertainty associated with *ybest* + chi2obs: float + observed value of chi-squared, used for consistency evaluation + """ + + print(f'cbe y_arr = {y_arr}') + n_estims = len(y_arr) + + if n_estims == 1: + isconsist = True + ybest = y_arr[0] + uybest = np.sqrt(vy_arr2d[0, 0]) + chi2obs = 0.0 + else: + e_arr = np.ones(n_estims) + vyinve_arr = np.linalg.solve(vy_arr2d, e_arr) + uy2 = 1 / np.dot(e_arr, vyinve_arr) + uybest = np.sqrt(uy2) + ybest = np.dot(vyinve_arr, y_arr) * uy2 + yred_arr = y_arr - ybest + chi2obs = np.dot(yred_arr.transpose(), np.linalg.solve(vy_arr2d, yred_arr)) # check need for transpose + chi2lim = chi2.ppf(problim, n_estims - 1) + isconsist = (chi2obs <= chi2lim) + + return isconsist, ybest, uybest, chi2obs + + def calc_best_estimate(self, y_arr, vy_arr2d, problim): + """Calculate the best estimate for a set of estimates with associated uncertainty matrix, + and determine if the set of estimates are consistent using a provided limit probability. + + Parameters + ---------- + y_arr: np.ndarray of shape (n) + vector of estimates of a measurand Y + vy_arr2d: np.ndarray of shape (n, n) + uncertainty matrix associated with y_arr + problim: float + probability limit used for assessing the consistency of the estimates. Typically, problim equals 0.95. + + Returns + ------- + isconsist: bool + indicator whether provided estimates are consistent in view of *problim* + ybest: float + best estimate of measurand + uybest: float + uncertainty associated with *ybest* + chi2obs: float + observed value of chi-squared, used for consistency evaluation + """ + + print(f'cbe y_arr = {y_arr}') + n_estims = len(y_arr) + + if n_estims == 1: + isconsist = True + ybest = y_arr[0] + uybest = np.sqrt(vy_arr2d[0, 0]) + chi2obs = 0.0 + else: + e_arr = np.ones(n_estims) + vyinve_arr = np.linalg.solve(vy_arr2d, e_arr) + uy2 = 1 / np.dot(e_arr, vyinve_arr) + uybest = np.sqrt(uy2) + ybest = np.dot(vyinve_arr, y_arr) * uy2 + yred_arr = y_arr - ybest + chi2obs = np.dot(yred_arr.transpose(), np.linalg.solve(vy_arr2d, yred_arr)) # check need for transpose + chi2lim = chi2.ppf(problim, n_estims - 1) + isconsist = (chi2obs <= chi2lim) + + return isconsist, ybest, uybest, chi2obs + + def get_combination(self, val_arr, n_keep, indcomb): + subsets = combinations(val_arr, n_keep) + i_subset = -1 + + for subset in subsets: + i_subset += 1 + if i_subset == indcomb: + return np.array(list(subset)) + + def calc_lcs(self, y_arr, vy_arr2d, problim): + """ + Function to calculate the best estimate of a measurand based on individual estimates of the + measurand with associated uncertainty matrix. + + Parameters + ---------- + y_arr: np.ndarray of shape (n) + vector with estimates of the measurand + vy_arr2d: np.ndarray of shape (n, n) + uncertainty matrix of the vector y_arr + problim: float + limit probability used in the consistency evaluation. Typically 0.95. + """ + isconsist, ybest, uybest, chi2obs = self.calc_best_estimate(y_arr, vy_arr2d, problim) + n_estims = len(y_arr) + estim_arr = np.arange(n_estims) + n_remove = 0 + + if isconsist: # set the other return variables + n_sols = 1 + indkeep = estim_arr + + while not isconsist: + n_remove += 1 + subsets = combinations(estim_arr, n_estims - n_remove) + n_subsets = comb(n_estims, n_remove, exact=True) + isconsist_arr = np.full(n_subsets, np.nan) + ybest_arr = np.full(n_subsets, np.nan) + uybest_arr = np.full(n_subsets, np.nan) + chi2obs_arr = np.full(n_subsets, np.nan) + i_subset = -1 + + for subset in subsets: + i_subset += 1 + sublist = list(subset) + yred_arr = y_arr[sublist] + vyred_arr2d = vy_arr2d[np.ix_(sublist, sublist)] + isconsist_arr[i_subset], ybest_arr[i_subset], uybest_arr[i_subset], chi2obs_arr[i_subset] = \ + self.calc_best_estimate(yred_arr, vyred_arr2d, problim) + + # Find smallest chi2obs value amongst all subsets. If multiple possibilities exist, return them all + indmin = np.argmin(chi2obs_arr) + + if isconsist_arr[indmin]: + # consistent solution found (otherwise isconsist remains false and the while loop continues) + isconsist = True + chi2obs = chi2obs_arr[indmin] # minimum chi2obs value + indmin = np.where(chi2obs_arr == chi2obs)[0] # list with all indices with minimum chi2obs value + n_sols = len(indmin) + + if n_sols == 1: + ybest = ybest_arr[indmin[0]] + uybest = uybest_arr[indmin[0]] + indkeep = self.get_combination(estim_arr, n_estims - n_remove, indmin) # indices of kept estimates + else: # multiple solutions exist, the return types become arrays + ybest = np.full(n_sols, np.nan) + uybest = np.full(n_sols, np.nan) + indkeep = np.full((n_sols, n_estims - n_remove), np.nan) + + for i_sol in range(n_sols): + ybest[i_sol] = ybest_arr[indmin[i_sol]] + uybest[i_sol] = uybest_arr[indmin[i_sol]] + indkeep[i_sol] = self.get_combination(estim_arr, n_estims - n_remove, indmin[i_sol]) + + return n_sols, ybest, uybest, chi2obs, indkeep + + def on_received_message(self, message): + """ + Handles incoming data from 'default' channels. + Stores 'default' data into an internal buffer + + Parameters + ---------- + message : dict + Only acceptable channel value is 'default'. + """ + if message['channel'] == 'default': + self.buffer_store(agent_from=message["from"], data=message["data"]) + return 0 + + def print_output_lcs(self, n_sols, ybest, uybest, chi2obs, indkeep, y_arr): + """ + Method to print the output of the method :func:`calc_lcs`. + + Parameters + ---------- + n_sols: int + number of best solutions + ybest: float or np.ndarray of shape (n_sols) + best estimate or vector of best estimates + uybest: float or np.ndarray of shape (n_sols) + standard uncertainty of best estimate or vector with standard uncertainty of best estimates + chi2obs: float + observed chi-squared value of all best solutions + indkeep: np.ndarary of shape (n) or (n_sols, n) + indices of retained estimates of y_arr for the calculation of the best estimate ybest + y_arr: np.ndarray of shape (n) + individual estimates of measurand + """ + n_estims = len(y_arr) + n_keep = indkeep.shape[-1] # number of retained estimates in the best solution(s) + + if n_sols == 1: + print( + f'calc_lcs found a unique solution with chi2obs = {chi2obs:4.4f} using {n_keep:.0f} of the provided {n_estims:.0f} estimates.') + print(f'\ty = {ybest:4.4f}, u(y) = {uybest:4.4f}') + print(f'\tIndices and values of retained provided estimates:', end=' ') + + for ind in indkeep[:-1]: + indint = int(ind) + print(f'y[{indint:.0f}]= {y_arr[indint]:2.2f}', end=', ') + + indint = int(indkeep[-1]) + print(f'y[{indint:.0f}]= {y_arr[indint]:2.2f}.\n') + else: + print( + f'calc_lcs found {n_sols:.0f} equally good solutions with chi2obs = {chi2obs:4.4f} using {n_keep:.0f} of the provided {n_estims:.0f} estimates.') + + for i_sol in range(n_sols): + print(f'\tSolution {i_sol:.0f} is:') + print(f'\ty = {ybest[i_sol]:4.4f}, u(y) = {uybest[i_sol]:4.4f}') + print('\tIndices and values of retained provided estimates:', end=' ') + + for ind in indkeep[i_sol][:-1]: + indint = int(ind) + print('y[%d]= %2.2f' % (indint, y_arr[indint]), end=', ') + + indint = int(indkeep[i_sol][-1]) + print('y[%d]= %2.2f.\n' % (indint, y_arr[indint])) + + return + + # Function that returns the index of a row of A that can be written as a linear combination of the others. + # This row does not contribute any new information to the system. + def ind_reduce_a(self, a_arr2d, epszero): + if a_arr2d.shape[0] <= np.linalg.matrix_rank(a_arr2d): + raise SystemMatrixNotReducibleError('A cannot be reduced!') + # Remove one row from A that is a linear combination of the other rows. + # Find a solution of A' * b = 0. + u, s, vh = np.linalg.svd(np.transpose(a_arr2d)) + # singVals = diag(S)%; + b = vh[-1, :] + indrem = np.where(abs(b) > epszero)[0] # remove a row corresponding to a non-zero entry in b. + + if len(indrem) == 0: + raise ValueError('b is a zero vector!') + + indrem = indrem[-1] # return the last row that can be taken out + # print('ReduceA: Identified row %d to be removed from a and A.\n', indRem); + return indrem + + # Reduced the system if matrix Vx is not of full rank. + # This might be ambiguous, as constant sensor values or offsets have to be estimated and are not known. + def reduce_vx(self, x_arr, vx_arr2d, a_arr, a_arr2d, epszero): + if vx_arr2d.shape[0] <= np.linalg.matrix_rank(vx_arr2d, epszero): + print('Vx cannot be reduced any further!') + return + # Remove one sensor from Vx, A and x that is a linear combination of the other sensors. + # Find a solution of Vx * b = 0. This + u, s, vh = np.linalg.svd(vx_arr2d) + b = vh[-1, :] # bottom row of vh is orthogonal to Vx + + if abs(np.dot(b, x_arr)) > epszero: + raise SensorsNotLinearlyIndependentError( + 'Sensors in x should be linearly independent with b^T * x = 0, but this is not the case!') + + indrem = np.where(abs(b) > epszero)[0] # remove a sensor corresponding to a non-zero entry in b. + if len(indrem) == 0: + raise ValueError('b is the zero vector!') + + indrem = indrem[-1] # take out the last sensor + indsenskeep = np.concatenate(np.arange(indrem), np.arange(indrem, vx_arr2d.shape[0])) + vxred_arr2d = vx_arr2d[indsenskeep, indsenskeep] + xred_arr = x_arr(indsenskeep) + # Update A by removing the sensor and updating the system of equations + + ared_arr2d = a_arr2d - a_arr2d[:, indrem] / b[indrem] * np.transpose(b) + if max(abs(ared_arr2d[:, indrem])) > epszero: + print(ared_arr2d) + raise ColumnNotZeroError(f'Column {indrem} should be zero by now!') + + ared_arr2d = a_arr2d[:, indsenskeep] # remove the zero column from A + ared_arr = a_arr + np.dot(a_arr2d - ared_arr2d, + x_arr) # adapt vector a_arr such that the vector of estimates y = a + A*x remains the same + + return xred_arr, vxred_arr2d, ared_arr, ared_arr2d + + def calc_best_est_lin_sys(self, a_arr, a_arr2d, x_arr, vx_arr2d, problim): + """ + Function to calculate the best estimate of a linear system **y** = **a** + A * **x** + and determines if the inputs are consistent in view of *problim*. + + Parameters + ---------- + a_arr: np.ndarray of shape (n_estimates) + vector **a** of linear system **y** = **a** + A * **x** + a_arr2d: np.ndarray of shape (n_estimates, n_sensors) + matrix A of linear system **y** = **a** + A * **x** + x_arr: np.ndarray of shape (n_sensors) + vector with sensor values + vector **x** of linear system **y** = **a** + A * **x** + vx_arr2d: np.ndarray of shape (n_sensors, n_sensors) + uncertainty matrix associated with vector x_arr + problim: float + probability limit used for consistency evaluation. Typically 0.95. + + Returns + ------- + isconsist: bool + indicator whether provided estimates are consistent in view of *problim* + ybest: float + best estimate + uybest: float + standard uncertainty of best estimate + chi2obs: float + observed chi-squared value + """ + print('start calc_best_est_lin_sys') + epszero = 1e-10 # some small constant used for some checks + + # The main procedure only works when vy_arr2d has full rank. Therefore first a_arr, a_arr2d and vx_arr2d need to be + # reduced such that vy_arr2d will have full rank. + xred_arr = x_arr + vxred_arr2d = vx_arr2d + ared_arr = a_arr + ared_arr2d = a_arr2d + + # print('cbels: x_arr = ', x_arr) + # print('cbels: a_arr2d = ', a_arr2d) + + # Reduce the system if the covariance matrix vx_arr2d is rank deficient. + while np.linalg.matrix_rank(vxred_arr2d) < vxred_arr2d.shape[0]: + print('Reducing Vx. No of rows = ', vxred_arr2d.shape[0], ', rank = ', np.linalg.matrix_rank(vxred_arr2d)) + [xred_arr, vxred_arr2d, ared_arr, ared_arr2d] = self.reduce_vx(xred_arr, vxred_arr2d, ared_arr, ared_arr2d, + epszero) + + # Reduce the system if a_arr2d has more rows than its rank. + while ared_arr2d.shape[0] > np.linalg.matrix_rank(ared_arr2d): + print('Reducing A. No of rows = ', ared_arr2d.shape[0], ', rank = ', np.linalg.matrix_rank(ared_arr2d)) + print(f'ared_arr2d: {ared_arr2d}') + ind_rem = self.ind_reduce_a(ared_arr2d, epszero) + n_rows = ared_arr2d.shape[0] + indrowskeep = np.concatenate((np.arange(0, ind_rem), np.arange(ind_rem + 1, n_rows))) + ared_arr = ared_arr[indrowskeep] + ared_arr2d = ared_arr2d[indrowskeep,] + + # calculate y vector and Vy matrix + print('ared_arr2d: ', ared_arr2d) + print('ared_arr: ', ared_arr.shape) + print('ared_arr2d: ', ared_arr2d.shape) + print('xred_arr: ', xred_arr.shape) + y_arr = ared_arr + np.dot(ared_arr2d, xred_arr) + vy_arr2d = np.matmul(np.matmul(ared_arr2d, vxred_arr2d), np.transpose(ared_arr2d)) + + # try to calculate a consistent solution with these y and Vy. + isconsist, ybest, uybest, chi2obs = self.calc_best_estimate(y_arr, vy_arr2d, problim) + # print('y_arr = ', y_arr) + # print('chi2obs = ', chi2obs) + return isconsist, ybest, uybest, chi2obs + + # function to calculate lcss + def calc_lcss(self, a_arr, a_arr2d, x_arr, vx_arr2d, problim): + """ + Calculation of the largest consistent subset of sensor values and the implied best estimate. + + Parameters + ---------- + x_arr + vx_arr2d + a_arr + a_arr2d + problim + a_arr: np.ndarray of shape (n_estimates) + vector **a** of linear system **y** = **a** + A * **x** + a_arr2d: np.ndarray of shape (n_estimates, n_sensors) + matrix A of linear system **y** = **a** + A * **x** + x_arr: np.ndarray of shape (n_sensors) + vector with sensor values + vector **x** of linear system **y** = **a** + A * **x** + vx_arr2d: np.ndarray of shape (n_sensors, n_sensors) + uncertainty matrix associated with vector x_arr + problim: float + probability limit used for consistency evaluation. Typically 0.95. + + Returns + ------- + isconsist: bool + indicator whether provided estimates are consistent in view of *problim* + ybest: float + best estimate + uybest: float + standard uncertainty of best estimate + chi2obs: float + observed chi-squared value + Returns + ------- + + """ + print('start calc_lcss') + epszero = 1e-7 # epsilon for rank check + eps_chi2 = 1e-7 # epsilon for chi2 equivalence check + + isconsist, ybest, uybest, chi2obs = self.calc_best_est_lin_sys(a_arr, a_arr2d, x_arr, vx_arr2d, problim) + n_sens = len(x_arr) + sens_arr = np.arange(n_sens) + n_remove = 0 + if isconsist: # set the other return variables + n_sols = 1 + indkeep = sens_arr + + # no consistent solution, remove sensors 1 by 1, 2 by 2, etc. + while not isconsist: + n_remove += 1 + subsets = combinations(sens_arr, n_sens - n_remove) + n_subsets = comb(n_sens, n_remove, exact=True) + isconsist_arr = np.full(n_subsets, np.nan) + ybest_arr = np.full(n_subsets, np.nan) + uybest_arr = np.full(n_subsets, np.nan) + chi2obs_arr = np.full(n_subsets, np.nan) + i_subset = -1 + for subset in subsets: + i_subset += 1 + sublistsenskeep = list(subset) + xred_arr = x_arr[sublistsenskeep] + vxred_arr2d = vx_arr2d[:, sublistsenskeep] + vxred_arr2d = vxred_arr2d[sublistsenskeep, :] + boolremove_arr = np.full(n_sens, True) + boolremove_arr[sublistsenskeep] = False + if np.linalg.matrix_rank( + np.concatenate((a_arr2d[:, boolremove_arr], np.ones((a_arr2d.shape[0], 1))), axis=1), + epszero) == \ + np.linalg.matrix_rank(a_arr2d[:, boolremove_arr], epszero): + # there is no vector c such that c' * ones = 1 and c' * ai = 0 at the same time. + # Thus this combination of sensors cannot be removed as a group from the matrix A. + isconsist_arr[i_subset] = False + continue # continue with next subset + + ared_arr2d = np.concatenate( + (a_arr2d[:, boolremove_arr], a_arr2d[:, sublistsenskeep]), + axis=1) # move the columns corresponding to sensors to be taken out to the front + q, r = np.linalg.qr(ared_arr2d) + q1 = q[:, + n_remove:] # these (n_sens-n_remove) columns are orthogonal to the first n_remove columns of ared_arr2d + s = np.sum(q1, axis=0) # column sums might be zero which is a problem for normalization to unit sum + indzero = np.where(np.abs(s) < epszero)[0] + if len(indzero) > 0: + indnonzero = np.full(n_sens - n_remove, True) # all True array + indnonzero[indzero] = False # positions that are zero are false + indnonzero = np.where(indnonzero) # conversion to indices instead of boolean array + if len(indnonzero) == 0: + print("ERROR: All columns have zero sum!") + b = q1[:, indnonzero[0]] # b is column vector with no zero column sum + for i_zero in range(len(indzero)): + q1[:, indzero[i_zero]] = q1[:, indzero[i_zero]] + b # add b to prevent zero column sum + q1 = q1 / np.sum(q1, + axis=0) # unit column sums, in order not to introduce a bias in the estimate of the measurand + + ared_arr2d = np.matmul(np.transpose(q1), ared_arr2d) + ared_arr = np.matmul(np.transpose(q1), a_arr) + # The columns of matrix ared_arr2d are still in the wrong order compared to the order of the sensors + ared2_arr2d = np.full_like(ared_arr2d, np.nan) + ared2_arr2d[:, boolremove_arr] = ared_arr2d[:, :n_remove] # columns 0, 1, ..., (n_remove-1) + ared2_arr2d[:, np.invert(boolremove_arr)] = ared_arr2d[:, + n_remove:] # columns n_remove, ..., (n_sens-1) + ared_arr2d = ared2_arr2d + + if np.linalg.norm(ared_arr2d[:, boolremove_arr]) > epszero: + raise ColumnNotZeroError(f'These columns of A should be zero by now!') + + ared_arr2d = ared_arr2d[:, + sublistsenskeep] # np.invert(boolremove_arr)] # reduce the matrix A by removing the appropriate columns of A, which are zero anyway. + + isconsist_arr[i_subset], ybest_arr[i_subset], uybest_arr[i_subset], chi2obs_arr[i_subset] = \ + self.calc_best_est_lin_sys(ared_arr, ared_arr2d, xred_arr, vxred_arr2d, problim) + + # After analyzing all subset, find the smallest chi2obs value amongst all subsets. + # If multiple possibilities exist, return them all + indmin = np.argmin(chi2obs_arr) + if isconsist_arr[indmin]: + # consistent solution found (otherwise isconsist remains false and the while loop continues) + isconsist = True + chi2obs = chi2obs_arr[indmin] # minimum chi2obs value + indmin = np.where(np.abs(chi2obs_arr - chi2obs) < eps_chi2)[ + 0] # list with all indices with minimum chi2obs value + n_sols = len(indmin) + if n_sols == 1: + ybest = ybest_arr[indmin[0]] + uybest = uybest_arr[indmin[0]] + indkeep = self.get_combination(sens_arr, n_sens - n_remove, indmin) # indices of kept estimates + else: # multiple solutions exist, the return types become arrays + ybest = np.full(n_sols, np.nan) + uybest = np.full(n_sols, np.nan) + indkeep = np.full((n_sols, n_sens - n_remove), np.nan) + for i_sol in range(n_sols): + ybest[i_sol] = ybest_arr[indmin[i_sol]] + uybest[i_sol] = uybest_arr[indmin[i_sol]] + indkeep[i_sol] = self.get_combination(sens_arr, n_sens - n_remove, indmin[i_sol]) + return n_sols, ybest, uybest, chi2obs, indkeep + + def print_input_lcss(self, x_arr, vx_arr2d, a_arr, a_arr2d, problim): + print(f"""INPUT of lcss function call: + Vector a of linear system: a_arr = {a_arr} + Matrix A of linear system: a_arr2d = {a_arr2d} + Vector x with sensor values: x_arr = {x_arr} + Covariance matrix Vx of sensor values: vx_arr2d = {vx_arr2d} + Limit probability for chi-squared test: p = {problim}""") + + def print_output_lcss(self, n_sols, ybest, uybest, chi2obs, indkeep, x_arr, a_arr2d): + n_sensors = len(x_arr) + n_eq = a_arr2d.shape[0] + n_keep = indkeep.shape[-1] # number of retained estimates in the best solution(s) + print(f'Provided number of sensors (or sensor values) was {n_sensors} and number of equations was {n_eq}.') + if n_sols == 1: + print('calc_lcss found a unique solution with chi2obs = %4.4f using %d of the provided %d sensor values.' + % (chi2obs, n_keep, n_sensors)) + print('\ty = %4.4f, u(y) = %4.4f' % (ybest, uybest)) + print('\tIndices and values of retained provided sensor values:', end=' ') + for ind in indkeep[:-1]: + indint = int(ind) + print('x[%d]= %2.2f' % (indint, x_arr[indint]), end=', ') + indint = int(indkeep[-1]) + print('x[%d]= %2.2f.\n' % (indint, x_arr[indint])) + else: + print( + 'calc_lcss found %d equally good solutions with chi2obs = %4.4f using %d of the provided %d sensor values.' + % (n_sols, chi2obs, n_keep, n_eq)) + for i_sol in range(n_sols): + print('\tSolution %d is:' % i_sol) + print('\ty = %4.4f, u(y) = %4.4f' % (ybest[i_sol], uybest[i_sol])) + print('\tIndices and values of retained provided sensor values:', end=' ') + for ind in indkeep[i_sol][:-1]: + indint = int(ind) + print('x[%d]= %2.2f' % (indint, x_arr[indint]), end=', ') + indint = int(indkeep[i_sol][-1]) + print('x[%d]= %2.2f.\n' % (indint, x_arr[indint])) + return diff --git a/agentMET4FOF_redundancy/__init__.py b/agentMET4FOF_redundancy/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/agentMET4FOF_redundancy/redundancy1.py b/agentMET4FOF_redundancy/redundancy1.py deleted file mode 100644 index 2503f478..00000000 --- a/agentMET4FOF_redundancy/redundancy1.py +++ /dev/null @@ -1,640 +0,0 @@ -# -*- coding: utf-8 -*- -""" -The module :mod:`redundancy1` implements methods for analysing redundant estimates provided by -redundant measurement data from a sensor network. - -The main functions included in the file *redundancy1.py* are: - -#. :func:`calc_consistent_estimates_no_corr`: Calculation of *n_rows* of best estimates for *n_rows* of sets of - independent estimates with associated standard uncertainty. -#. :func:`calc_best_estimate`: Calculation of the best estimate for a given set of estimates with associated uncertainty - matrix. -#. :func:`calc_lcs`: Calculation of the largest subset of consistent estimates of a measurand. -#. :func:`calc_lcss`: Calculation of the largest subset of sensor values that yield consistent estimates of a measurand - linked to the sensor values by a linear system of equations. - -The scientific publication giving more information on this topic is: - - G. Kok and P. Harris, "Uncertainty Evaluation for Metrologically Redundant Industrial Sensor Networks," - 2020 IEEE International Workshop on Metrology for Industry 4.0 & IoT, Roma, Italy, 2020, - pp. 84-88, doi: `10.1109/MetroInd4.0IoT48571.2020.9138297 - `_. -""" - -import itertools - -import numpy as np -from scipy.special import comb -from scipy.stats import chi2 - - -class SensorsNotLinearlyIndependentError(Exception): - """ - Custom exception to handle the case when sensor results are not linearly independent - """ - pass - - -class SystemMatrixNotReducibleError(Exception): - """ - Custom exception to handle the case when the system matrix *A* is not reducible - """ - pass - - -class ColumnNotZeroError(Exception): - """ - Custom exception to handle the case when a redundant column has not been reduced to zero - """ - pass - - -def calc_consistent_estimates_no_corr(y_arr2d, uy_arr2d, prob_lim): - """ - Calculation of consistent estimate for n_sets of estimates y_ij (contained in - y_arr2d) of a quantity Y, where each set contains n_estims estimates. - The uncertainties are assumed to be independent and given in uy_arr2d. - The consistency test is using limit probability limit prob_lim. - For each set of estimates, the best estimate, uncertainty, - observed chi-2 value and a flag if the - provided estimates were consistent given the model are given as output. - - Parameters - ---------- - y_arr2d: np.ndarray of size (n_rows, n_estimates) - each row contains m=n_estimates independent estimates of a measurand - uy_arr2d: np.ndarray of size (n_rows, n_estimates) - each row contains the standard uncertainty u(y_ij) of y_ij = y_arr2d[i,j] - prob_lim: limit probability used in consistency test. Typically 0.95. - - Returns - ------- - isconsist_arr: bool array of shape (n_rows) - indicates for each row if the n_estimates are consistent or not - ybest_arr: np.ndarray of shape (n_rows) - contains the best estimate for each row of individual estimates - uybest_arr: np.ndarray of shape (n_rows) - contains the uncertainty associated with each best estimate for each row of *y_arr2d* - chi2obs_arr: observed chi-squared value for each row - - """ - - if len(y_arr2d.shape) > 1: - n_sets = y_arr2d.shape[0] - else: - n_sets = 1 - - n_estims = y_arr2d.shape[-1] # last dimension is number of estimates - chi2_lim = chi2.ppf(1 - prob_lim, n_estims - 1) - uy2inv_arr2d = 1 / np.power(uy_arr2d, 2) - uy2best_arr = 1 / np.sum(uy2inv_arr2d, -1) - uybest_arr = np.sqrt(uy2best_arr) - ybest_arr = np.sum(y_arr2d * uy2inv_arr2d, -1) * uy2best_arr - - if n_sets > 1: - ybest_arr = ybest_arr.reshape(n_sets, 1) # make a column vector of ybest_arr - - chi2obs_arr = np.sum(np.power((y_arr2d - np.broadcast_to(ybest_arr, (n_sets, n_estims))) / uy_arr2d, 2), -1) - isconsist_arr = (chi2obs_arr <= chi2_lim) - - return isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr - - -def print_output_single(isconsist, ybest, uybest, chi2obs): - """ - Function to print the output of a single row of the calculate_best_estimate function. - - Parameters - ---------- - isconsist: bool - Indicates if provided estimates were consistent - ybest: float - best estimate - uybest: float - uncertainty of best estimate - chi2obs: float - observed value of chi-squared - """ - print('\tThe observed chi-2 value is %3.3f.' % chi2obs) - - if isconsist: - print("\tThe provided estimates (input) were consistent.") - else: - print("\tThe provided estimates (input) were not consistent.") - - print(f"\tThe best estimate is {ybest:3.3f} with uncertainty {uybest:3.3f}.\n") - - -def print_output_cbe(isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr): - """ - Function to print the full output of calc_best_estimate. - - Parameters - ---------- - isconsist_arr: bool array of shape (n_rows) - indicates for each row if the n_estimates are consistent or not - ybest_arr: np.ndarray of shape (n_rows) - contains the best estimate for each row of individual estimates - uybest_arr: np.ndarray of shape (n_rows) - contains the uncertainty associated with each best estimate for each row of *y_arr2d* - chi2obs_arr: observed chi-squared value for each row - - Returns - ------- - - """ - if len(ybest_arr.shape) == 0: - print_output_single(isconsist_arr, ybest_arr, uybest_arr, chi2obs_arr) - else: - n_sets = ybest_arr.shape[0] - print(f'There are {n_sets:.0f} sets with estimates of the measurand.') - for i_set in range(n_sets): - print(f'The result of set {i_set:.0f} is:') - print_output_single(isconsist_arr.item(i_set), ybest_arr.item(i_set), uybest_arr.item(i_set), - chi2obs_arr.item(i_set)) - - -def calc_best_estimate(y_arr, vy_arr2d, problim): - """Calculate the best estimate for a set of estimates with associated uncertainty matrix, - and determine if the set of estimates are consistent using a provided limit probability. - - Parameters - ---------- - y_arr: np.ndarray of shape (n) - vector of estimates of a measurand Y - vy_arr2d: np.ndarray of shape (n, n) - uncertainty matrix associated with y_arr - problim: float - probability limit used for assessing the consistency of the estimates. Typically, problim equals 0.95. - - Returns - ------- - isconsist: bool - indicator whether provided estimates are consistent in view of *problim* - ybest: float - best estimate of measurand - uybest: float - uncertainty associated with *ybest* - chi2obs: float - observed value of chi-squared, used for consistency evaluation - """ - - print(f'cbe y_arr = {y_arr}') - n_estims = len(y_arr) - - if n_estims == 1: - isconsist = True - ybest = y_arr[0] - uybest = np.sqrt(vy_arr2d[0, 0]) - chi2obs = 0.0 - else: - e_arr = np.ones(n_estims) - vyinve_arr = np.linalg.solve(vy_arr2d, e_arr) - uy2 = 1 / np.dot(e_arr, vyinve_arr) - uybest = np.sqrt(uy2) - ybest = np.dot(vyinve_arr, y_arr) * uy2 - yred_arr = y_arr - ybest - chi2obs = np.dot(yred_arr.transpose(), np.linalg.solve(vy_arr2d, yred_arr)) # check need for transpose - chi2lim = chi2.ppf(problim, n_estims - 1) - isconsist = (chi2obs <= chi2lim) - - return isconsist, ybest, uybest, chi2obs - - -# function that returns a list with the values corresponding to combination indcomb -def get_combination(val_arr, n_keep, indcomb): - subsets = itertools.combinations(val_arr, n_keep) - i_subset = -1 - - for subset in subsets: - i_subset += 1 - if i_subset == indcomb: - return np.array(list(subset)) - - return -1 # error, index indcomb is probably out of range or not an integer - - -def calc_lcs(y_arr, vy_arr2d, problim): - """ - Function to calculate the best estimate of a measurand based on individual estimates of the - measurand with associated uncertainty matrix. - - Parameters - ---------- - y_arr: np.ndarray of shape (n) - vector with estimates of the measurand - vy_arr2d: np.ndarray of shape (n, n) - uncertainty matrix of the vector y_arr - problim: float - limit probability used in the consistency evaluation. Typically 0.95. - """ - isconsist, ybest, uybest, chi2obs = calc_best_estimate(y_arr, vy_arr2d, problim) - n_estims = len(y_arr) - estim_arr = np.arange(n_estims) - n_remove = 0 - - if isconsist: # set the other return variables - n_sols = 1 - indkeep = estim_arr - - while not isconsist: - n_remove += 1 - subsets = itertools.combinations(estim_arr, n_estims - n_remove) - n_subsets = comb(n_estims, n_remove, exact=True) - isconsist_arr = np.full(n_subsets, np.nan) - ybest_arr = np.full(n_subsets, np.nan) - uybest_arr = np.full(n_subsets, np.nan) - chi2obs_arr = np.full(n_subsets, np.nan) - i_subset = -1 - - for subset in subsets: - i_subset += 1 - sublist = list(subset) - yred_arr = y_arr[sublist] - vyred_arr2d = vy_arr2d[np.ix_(sublist, sublist)] - isconsist_arr[i_subset], ybest_arr[i_subset], uybest_arr[i_subset], chi2obs_arr[i_subset] = \ - calc_best_estimate(yred_arr, vyred_arr2d, problim) - - # Find smallest chi2obs value amongst all subsets. If multiple possibilities exist, return them all - indmin = np.argmin(chi2obs_arr) - - if isconsist_arr[indmin]: - # consistent solution found (otherwise isconsist remains false and the while loop continues) - isconsist = True - chi2obs = chi2obs_arr[indmin] # minimum chi2obs value - indmin = np.where(chi2obs_arr == chi2obs)[0] # list with all indices with minimum chi2obs value - n_sols = len(indmin) - - if n_sols == 1: - ybest = ybest_arr[indmin[0]] - uybest = uybest_arr[indmin[0]] - indkeep = get_combination(estim_arr, n_estims - n_remove, indmin) # indices of kept estimates - else: # multiple solutions exist, the return types become arrays - ybest = np.full(n_sols, np.nan) - uybest = np.full(n_sols, np.nan) - indkeep = np.full((n_sols, n_estims - n_remove), np.nan) - - for i_sol in range(n_sols): - ybest[i_sol] = ybest_arr[indmin[i_sol]] - uybest[i_sol] = uybest_arr[indmin[i_sol]] - indkeep[i_sol] = get_combination(estim_arr, n_estims - n_remove, indmin[i_sol]) - - return n_sols, ybest, uybest, chi2obs, indkeep - - -def print_output_lcs(n_sols, ybest, uybest, chi2obs, indkeep, y_arr): - """ - Method to print the output of the method :func:`calc_lcs`. - - Parameters - ---------- - n_sols: int - number of best solutions - ybest: float or np.ndarray of shape (n_sols) - best estimate or vector of best estimates - uybest: float or np.ndarray of shape (n_sols) - standard uncertainty of best estimate or vector with standard uncertainty of best estimates - chi2obs: float - observed chi-squared value of all best solutions - indkeep: np.ndarary of shape (n) or (n_sols, n) - indices of retained estimates of y_arr for the calculation of the best estimate ybest - y_arr: np.ndarray of shape (n) - individual estimates of measurand - """ - n_estims = len(y_arr) - n_keep = indkeep.shape[-1] # number of retained estimates in the best solution(s) - - if n_sols == 1: - print( - f'calc_lcs found a unique solution with chi2obs = {chi2obs:4.4f} using {n_keep:.0f} of the provided {n_estims:.0f} estimates.') - print(f'\ty = {ybest:4.4f}, u(y) = {uybest:4.4f}') - print(f'\tIndices and values of retained provided estimates:', end=' ') - - for ind in indkeep[:-1]: - indint = int(ind) - print(f'y[{indint:.0f}]= {y_arr[indint]:2.2f}', end=', ') - - indint = int(indkeep[-1]) - print(f'y[{indint:.0f}]= {y_arr[indint]:2.2f}.\n') - else: - print( - f'calc_lcs found {n_sols:.0f} equally good solutions with chi2obs = {chi2obs:4.4f} using {n_keep:.0f} of the provided {n_estims:.0f} estimates.') - - for i_sol in range(n_sols): - print(f'\tSolution {i_sol:.0f} is:') - print(f'\ty = {ybest[i_sol]:4.4f}, u(y) = {uybest[i_sol]:4.4f}') - print('\tIndices and values of retained provided estimates:', end=' ') - - for ind in indkeep[i_sol][:-1]: - indint = int(ind) - print('y[%d]= %2.2f' % (indint, y_arr[indint]), end=', ') - - indint = int(indkeep[i_sol][-1]) - print('y[%d]= %2.2f.\n' % (indint, y_arr[indint])) - - return - - -# Function that returns the index of a row of A that can be written as a linear combination of the others. -# This row does not contribute any new information to the system. -def ind_reduce_a(a_arr2d, epszero): - if a_arr2d.shape[0] <= np.linalg.matrix_rank(a_arr2d): - raise SystemMatrixNotReducibleError('A cannot be reduced!') - # Remove one row from A that is a linear combination of the other rows. - # Find a solution of A' * b = 0. - u, s, vh = np.linalg.svd(np.transpose(a_arr2d)) - # singVals = diag(S)%; - b = vh[-1, :] - indrem = np.where(abs(b) > epszero)[0] # remove a row corresponding to a non-zero entry in b. - - if len(indrem) == 0: - raise ValueError('b is a zero vector!') - - indrem = indrem[-1] # return the last row that can be taken out - # print('ReduceA: Identified row %d to be removed from a and A.\n', indRem); - return indrem - - -# Reduced the system if matrix Vx is not of full rank. -# This might be ambiguous, as constant sensor values or offsets have to be estimated and are not known. -def reduce_vx(x_arr, vx_arr2d, a_arr, a_arr2d, epszero): - if vx_arr2d.shape[0] <= np.linalg.matrix_rank(vx_arr2d, epszero): - print('Vx cannot be reduced any further!') - return - # Remove one sensor from Vx, A and x that is a linear combination of the other sensors. - # Find a solution of Vx * b = 0. This - u, s, vh = np.linalg.svd(vx_arr2d) - b = vh[-1, :] # bottom row of vh is orthogonal to Vx - - if abs(np.dot(b, x_arr)) > epszero: - raise SensorsNotLinearlyIndependentError( - 'Sensors in x should be linearly independent with b^T * x = 0, but this is not the case!') - - indrem = np.where(abs(b) > epszero)[0] # remove a sensor corresponding to a non-zero entry in b. - if len(indrem) == 0: - raise ValueError('b is the zero vector!') - - indrem = indrem[-1] # take out the last sensor - indsenskeep = np.concatenate(np.arange(indrem), np.arange(indrem, vx_arr2d.shape[0])) - vxred_arr2d = vx_arr2d[indsenskeep, indsenskeep] - xred_arr = x_arr(indsenskeep) - # Update A by removing the sensor and updating the system of equations - - ared_arr2d = a_arr2d - a_arr2d[:, indrem] / b[indrem] * np.transpose(b) - if max(abs(ared_arr2d[:, indrem])) > epszero: - print(ared_arr2d) - raise ColumnNotZeroError(f'Column {indrem} should be zero by now!') - - ared_arr2d = a_arr2d[:, indsenskeep] # remove the zero column from A - ared_arr = a_arr + np.dot(a_arr2d - ared_arr2d, - x_arr) # adapt vector a_arr such that the vector of estimates y = a + A*x remains the same - - return xred_arr, vxred_arr2d, ared_arr, ared_arr2d - - -def calc_best_est_lin_sys(a_arr, a_arr2d, x_arr, vx_arr2d, problim): - """ - Function to calculate the best estimate of a linear system **y** = **a** + A * **x** - and determines if the inputs are consistent in view of *problim*. - - Parameters - ---------- - a_arr: np.ndarray of shape (n_estimates) - vector **a** of linear system **y** = **a** + A * **x** - a_arr2d: np.ndarray of shape (n_estimates, n_sensors) - matrix A of linear system **y** = **a** + A * **x** - x_arr: np.ndarray of shape (n_sensors) - vector with sensor values - vector **x** of linear system **y** = **a** + A * **x** - vx_arr2d: np.ndarray of shape (n_sensors, n_sensors) - uncertainty matrix associated with vector x_arr - problim: float - probability limit used for consistency evaluation. Typically 0.95. - - Returns - ------- - isconsist: bool - indicator whether provided estimates are consistent in view of *problim* - ybest: float - best estimate - uybest: float - standard uncertainty of best estimate - chi2obs: float - observed chi-squared value - """ - print('start calc_best_est_lin_sys') - epszero = 1e-10 # some small constant used for some checks - - # The main procedure only works when vy_arr2d has full rank. Therefore first a_arr, a_arr2d and vx_arr2d need to be - # reduced such that vy_arr2d will have full rank. - xred_arr = x_arr - vxred_arr2d = vx_arr2d - ared_arr = a_arr - ared_arr2d = a_arr2d - - # print('cbels: x_arr = ', x_arr) - # print('cbels: a_arr2d = ', a_arr2d) - - # Reduce the system if the covariance matrix vx_arr2d is rank deficient. - while np.linalg.matrix_rank(vxred_arr2d) < vxred_arr2d.shape[0]: - print('Reducing Vx. No of rows = ', vxred_arr2d.shape[0], ', rank = ', np.linalg.matrix_rank(vxred_arr2d)) - [xred_arr, vxred_arr2d, ared_arr, ared_arr2d] = reduce_vx(xred_arr, vxred_arr2d, ared_arr, ared_arr2d, epszero) - - # Reduce the system if a_arr2d has more rows than its rank. - while ared_arr2d.shape[0] > np.linalg.matrix_rank(ared_arr2d): - print('Reducing A. No of rows = ', ared_arr2d.shape[0], ', rank = ', np.linalg.matrix_rank(ared_arr2d)) - print(f'ared_arr2d: {ared_arr2d}') - ind_rem = ind_reduce_a(ared_arr2d, epszero) - n_rows = ared_arr2d.shape[0] - indrowskeep = np.concatenate((np.arange(0, ind_rem), np.arange(ind_rem + 1, n_rows))) - ared_arr = ared_arr[indrowskeep] - ared_arr2d = ared_arr2d[indrowskeep, ] - - # calculate y vector and Vy matrix - print('ared_arr2d: ', ared_arr2d) - print('ared_arr: ', ared_arr.shape) - print('ared_arr2d: ', ared_arr2d.shape) - print('xred_arr: ', xred_arr.shape) - y_arr = ared_arr + np.dot(ared_arr2d, xred_arr) - vy_arr2d = np.matmul(np.matmul(ared_arr2d, vxred_arr2d), np.transpose(ared_arr2d)) - - # try to calculate a consistent solution with these y and Vy. - isconsist, ybest, uybest, chi2obs = calc_best_estimate(y_arr, vy_arr2d, problim) - # print('y_arr = ', y_arr) - # print('chi2obs = ', chi2obs) - return isconsist, ybest, uybest, chi2obs - - -# function to calculate lcss -def calc_lcss(a_arr, a_arr2d, x_arr, vx_arr2d, problim): - """ - Calculation of the largest consistent subset of sensor values and the implied best estimate. - - Parameters - ---------- - x_arr - vx_arr2d - a_arr - a_arr2d - problim - a_arr: np.ndarray of shape (n_estimates) - vector **a** of linear system **y** = **a** + A * **x** - a_arr2d: np.ndarray of shape (n_estimates, n_sensors) - matrix A of linear system **y** = **a** + A * **x** - x_arr: np.ndarray of shape (n_sensors) - vector with sensor values - vector **x** of linear system **y** = **a** + A * **x** - vx_arr2d: np.ndarray of shape (n_sensors, n_sensors) - uncertainty matrix associated with vector x_arr - problim: float - probability limit used for consistency evaluation. Typically 0.95. - - Returns - ------- - isconsist: bool - indicator whether provided estimates are consistent in view of *problim* - ybest: float - best estimate - uybest: float - standard uncertainty of best estimate - chi2obs: float - observed chi-squared value - Returns - ------- - - """ - print('start calc_lcss') - epszero = 1e-7 # epsilon for rank check - eps_chi2 = 1e-7 # epsilon for chi2 equivalence check - - isconsist, ybest, uybest, chi2obs = calc_best_est_lin_sys(a_arr, a_arr2d, x_arr, vx_arr2d, problim) - n_sens = len(x_arr) - sens_arr = np.arange(n_sens) - n_remove = 0 - if isconsist: # set the other return variables - n_sols = 1 - indkeep = sens_arr - - # no consistent solution, remove sensors 1 by 1, 2 by 2, etc. - while not isconsist: - n_remove += 1 - subsets = itertools.combinations(sens_arr, n_sens - n_remove) - n_subsets = comb(n_sens, n_remove, exact=True) - isconsist_arr = np.full(n_subsets, np.nan) - ybest_arr = np.full(n_subsets, np.nan) - uybest_arr = np.full(n_subsets, np.nan) - chi2obs_arr = np.full(n_subsets, np.nan) - i_subset = -1 - for subset in subsets: - i_subset += 1 - sublistsenskeep = list(subset) - xred_arr = x_arr[sublistsenskeep] - vxred_arr2d = vx_arr2d[:, sublistsenskeep] - vxred_arr2d = vxred_arr2d[sublistsenskeep, :] - boolremove_arr = np.full(n_sens, True) - boolremove_arr[sublistsenskeep] = False - if np.linalg.matrix_rank( - np.concatenate((a_arr2d[:, boolremove_arr], np.ones((a_arr2d.shape[0], 1))), axis=1), epszero) == \ - np.linalg.matrix_rank(a_arr2d[:, boolremove_arr], epszero): - # there is no vector c such that c' * ones = 1 and c' * ai = 0 at the same time. - # Thus this combination of sensors cannot be removed as a group from the matrix A. - isconsist_arr[i_subset] = False - continue # continue with next subset - - ared_arr2d = np.concatenate( - (a_arr2d[:, boolremove_arr], a_arr2d[:, sublistsenskeep]), - axis=1) # move the columns corresponding to sensors to be taken out to the front - q, r = np.linalg.qr(ared_arr2d) - q1 = q[:, n_remove:] # these (n_sens-n_remove) columns are orthogonal to the first n_remove columns of ared_arr2d - s = np.sum(q1, axis=0) # column sums might be zero which is a problem for normalization to unit sum - indzero = np.where(np.abs(s) < epszero)[0] - if len(indzero) > 0: - indnonzero = np.full(n_sens - n_remove, True) # all True array - indnonzero[indzero] = False # positions that are zero are false - indnonzero = np.where(indnonzero) # conversion to indices instead of boolean array - if len(indnonzero) == 0: - print("ERROR: All columns have zero sum!") - b = q1[:, indnonzero[0]] # b is column vector with no zero column sum - for i_zero in range(len(indzero)): - q1[:, indzero[i_zero]] = q1[:, indzero[i_zero]] + b # add b to prevent zero column sum - q1 = q1 / np.sum(q1, axis=0) # unit column sums, in order not to introduce a bias in the estimate of the measurand - - ared_arr2d = np.matmul(np.transpose(q1), ared_arr2d) - ared_arr = np.matmul(np.transpose(q1), a_arr) - # The columns of matrix ared_arr2d are still in the wrong order compared to the order of the sensors - ared2_arr2d = np.full_like(ared_arr2d, np.nan) - ared2_arr2d[:, boolremove_arr] = ared_arr2d[:, :n_remove] # columns 0, 1, ..., (n_remove-1) - ared2_arr2d[:, np.invert(boolremove_arr)] = ared_arr2d[:, n_remove:] # columns n_remove, ..., (n_sens-1) - ared_arr2d = ared2_arr2d - - if np.linalg.norm(ared_arr2d[:, boolremove_arr]) > epszero: - raise ColumnNotZeroError(f'These columns of A should be zero by now!') - - ared_arr2d = ared_arr2d[:, sublistsenskeep] # np.invert(boolremove_arr)] # reduce the matrix A by removing the appropriate columns of A, which are zero anyway. - - isconsist_arr[i_subset], ybest_arr[i_subset], uybest_arr[i_subset], chi2obs_arr[i_subset] = \ - calc_best_est_lin_sys(ared_arr, ared_arr2d, xred_arr, vxred_arr2d, problim) - - # After analyzing all subset, find the smallest chi2obs value amongst all subsets. - # If multiple possibilities exist, return them all - indmin = np.argmin(chi2obs_arr) - if isconsist_arr[indmin]: - # consistent solution found (otherwise isconsist remains false and the while loop continues) - isconsist = True - chi2obs = chi2obs_arr[indmin] # minimum chi2obs value - indmin = np.where(np.abs(chi2obs_arr - chi2obs) < eps_chi2)[ - 0] # list with all indices with minimum chi2obs value - n_sols = len(indmin) - if n_sols == 1: - ybest = ybest_arr[indmin[0]] - uybest = uybest_arr[indmin[0]] - indkeep = get_combination(sens_arr, n_sens - n_remove, indmin) # indices of kept estimates - else: # multiple solutions exist, the return types become arrays - ybest = np.full(n_sols, np.nan) - uybest = np.full(n_sols, np.nan) - indkeep = np.full((n_sols, n_sens - n_remove), np.nan) - for i_sol in range(n_sols): - ybest[i_sol] = ybest_arr[indmin[i_sol]] - uybest[i_sol] = uybest_arr[indmin[i_sol]] - indkeep[i_sol] = get_combination(sens_arr, n_sens - n_remove, indmin[i_sol]) - return n_sols, ybest, uybest, chi2obs, indkeep - - -def print_input_lcss(x_arr, vx_arr2d, a_arr, a_arr2d, problim): - print(f"""INPUT of lcss function call: - Vector a of linear system: a_arr = {a_arr} - Matrix A of linear system: a_arr2d = {a_arr2d} - Vector x with sensor values: x_arr = {x_arr} - Covariance matrix Vx of sensor values: vx_arr2d = {vx_arr2d} - Limit probability for chi-squared test: p = {problim}""") - - -def print_output_lcss(n_sols, ybest, uybest, chi2obs, indkeep, x_arr, a_arr2d): - n_sensors = len(x_arr) - n_eq = a_arr2d.shape[0] - n_keep = indkeep.shape[-1] # number of retained estimates in the best solution(s) - print(f'Provided number of sensors (or sensor values) was {n_sensors} and number of equations was {n_eq}.') - if n_sols == 1: - print('calc_lcss found a unique solution with chi2obs = %4.4f using %d of the provided %d sensor values.' - % (chi2obs, n_keep, n_sensors)) - print('\ty = %4.4f, u(y) = %4.4f' % (ybest, uybest)) - print('\tIndices and values of retained provided sensor values:', end=' ') - for ind in indkeep[:-1]: - indint = int(ind) - print('x[%d]= %2.2f' % (indint, x_arr[indint]), end=', ') - indint = int(indkeep[-1]) - print('x[%d]= %2.2f.\n' % (indint, x_arr[indint])) - else: - print( - 'calc_lcss found %d equally good solutions with chi2obs = %4.4f using %d of the provided %d sensor values.' - % (n_sols, chi2obs, n_keep, n_eq)) - for i_sol in range(n_sols): - print('\tSolution %d is:' % i_sol) - print('\ty = %4.4f, u(y) = %4.4f' % (ybest[i_sol], uybest[i_sol])) - print('\tIndices and values of retained provided sensor values:', end=' ') - for ind in indkeep[i_sol][:-1]: - indint = int(ind) - print('x[%d]= %2.2f' % (indint, x_arr[indint]), end=', ') - indint = int(indkeep[i_sol][-1]) - print('x[%d]= %2.2f.\n' % (indint, x_arr[indint])) - return diff --git a/agentMET4FOF_redundancy/redundancyAgents1.py b/agentMET4FOF_redundancy/redundancyAgents1.py deleted file mode 100644 index 80861fe8..00000000 --- a/agentMET4FOF_redundancy/redundancyAgents1.py +++ /dev/null @@ -1,274 +0,0 @@ -""" -This module defines a Redundancy Agent that can be used in the agentMET4FOF framework. It has two main data processing -types: -- lcs: best estimate calculation using Largest Consistent Subset method -- lcss: best estimate calculation using Largest Consistent Subset of Sensor values method - -""" -from typing import Dict - -import numpy as np -from agentMET4FOF.metrological_agents import MetrologicalAgent -from time_series_metadata.scheme import MetaData - -from agentMET4FOF.metrological_streams import ( - MetrologicalDataStreamMET4FOF, - MetrologicalMultiWaveGenerator, -) - -from .redundancy1 import calc_lcs, calc_lcss - - -class MetrologicalMultiWaveGeneratorAgent(MetrologicalAgent): - """An agent streaming a signal composed of various sine and cosine components. - Takes samples from the :py:mod:`MultiWaveGenerator` and pushes them sample by sample (or in batches) - to connected agents via its output channel. - """ - - # The datatype of the stream will be MultiWaveGenerator. - _data_stream: MetrologicalDataStreamMET4FOF - batch_size = 100 - - def init_parameters( - self, - signal: MetrologicalDataStreamMET4FOF = MetrologicalMultiWaveGenerator(), - **kwargs - ): - """ - Initialize the input data stream as an instance of the :py:mod:`MultiWaveGenerator` class - - Parameters - ---------- - signal : MetrologicalDataStreamMET4FOF - the underlying signal for the generator - """ - self._data_stream = signal - super().init_parameters() - self.set_output_data(channel="default", metadata=self._data_stream.metadata) - - def agent_loop(self): - """Model the agent's behaviour - On state *Running* the agent will extract sample by sample the input data - streams content and push it via invoking :py:func:`AgentMET4FOF.send_output`. - """ - if self.current_state == "Running": - self.set_output_data(channel="default", data=self._data_stream.next_sample(batch_size=10)) - super().agent_loop() - - @property - def metadata(self) -> Dict: - return self._data_stream.metadata.metadata - - -class RedundancyAgent(MetrologicalAgent): - """ - This is the main Redundancy Agent class. Main calculation types are :py:func:`lcs` and :py:func:`lcss`, as defined - in the module :mod:`redundancy1`. - """ - metadata: MetaData - n_pr: int - problim: float - calc_type: str - sensor_key_list: list - a_arr: np.ndarray - a_arr2d: np.ndarray - - def init_parameters( - self, - input_data_maxlen: int = 25, - output_data_maxlen: int = 25, - sensor_key_list: list = None, - n_pr: int = 1, - problim: float = .9, - calc_type: str = 'lcs' - ): - """ - Initialize the redundancy agent as an instance of the :py:mod:`MetrologicalAgent` class. - - Parent class parameters - ---------- - input_data_maxlen: int - - output_data_maxlen: int - - - Parameters used for both methods :func:`lcs` and :func:`lcss`. - ---------- - calc_type: str - calculation type: 'lcs' or 'lcss' - sensor_key_list: list of strings - list containing the names of the sensors that should feed data to the Redundancy Agent - n_pr: integer - size of the batch of data that is handled at once by the Redundancy Agent - problim: float - limit probability used for consistency evaluation - """ - - if sensor_key_list is None: - sensor_key_list = [] - super().init_parameters(input_data_maxlen=25, output_data_maxlen=25) - self.metadata = MetaData( - device_id="RedAgent01", - time_name="time", - time_unit="s", - quantity_names="m", - quantity_units="kg", - misc="nothing") - - self.calc_type = calc_type - self.sensor_key_list = sensor_key_list - self.n_pr = n_pr - self.problim = problim - - self.set_output_data(channel="default", metadata=self.metadata) - - def init_parameters2(self, fsam, f1, f2, ampl_ratio, phi1, phi2): - """ - Additional parameters used for this particular example in combination with the :py:func:`lcss` method. - It provides the prior knowledge needed to make the information contained in the data redundant. - This method sets up the vector **a** and matrix *A* for the system **y** = **a** + *A* * **x**. - - Parameters - ---------- - fsam: float - sampling frequency - f1: float - first frequency of interest in signal - f2 : float - second frequency of interest in signal - ampl_ratio: float - ratio of the amplitudes of the two frequency components - phi1: float - initial phase of first frequency component - phi2: float - initial phase of second frequency component - """ - # set-up vector a_arr and matrix a_arr2d for redundancy method - id_mat = np.identity(self.n_pr) - id_fft = np.fft.fft(id_mat) - - n_pr2 = int(self.n_pr / 2) - bfft = id_fft[:n_pr2, :].real / n_pr2 - cfft = -id_fft[:n_pr2, :].imag / n_pr2 - - c1 = 1 / np.cos(phi1) - c2 = 1 / np.sin(-phi1) - c3 = ampl_ratio / np.cos(phi2) - c4 = ampl_ratio / np.sin(-phi2) - - df = fsam / self.n_pr - - ind_freq1 = int(f1/df) - ind_freq2 = int(f2/df) - - a_row1 = c1 * bfft[ind_freq1, :] - a_row2 = c2 * cfft[ind_freq1, :] - a_row3 = c3 * bfft[ind_freq2, :] - a_row4 = c4 * cfft[ind_freq2, :] - - a_row1 = a_row1.reshape((1, len(a_row1))) - a_row2 = a_row2.reshape((1, len(a_row2))) - a_row3 = a_row3.reshape((1, len(a_row3))) - a_row4 = a_row4.reshape((1, len(a_row4))) - - self.a_arr2d = np.concatenate((a_row1, a_row2, a_row3, a_row4), axis=0) - self.a_arr = np.zeros(shape=(4, 1)) - - def agent_loop(self): - """ - Model the agent's behaviour - On state *Running* the agent will extract sample by sample the input data - streams content and push it via invoking :py:func:`AgentMET4FOF.send_output`. - """ - if self.current_state == "Running": - # sometimes the buffer does not contain values for all sensors - # sensor_key_list = ["Sensor1", "Sensor2"] - key_list = [key for key in self.sensor_key_list if key in self.buffer.keys()] - n_sensors = len(key_list) - if n_sensors != len(self.sensor_key_list): # expected number of sensors - print('Not all sensors were present in the buffer. Not evaluating the data.') - return 0 - - for key in key_list: - if self.buffer[key].shape[0] < self.n_pr: - print('Buffer size is ', self.buffer[key].shape[0], ', which is less than ', self.n_pr, '.') - print('Not enough data for redundancy agent evaluation.') - return 0 - - buff = self.buffer.popleft(self.n_pr) # take n_pr entries out from the buffer - - t_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) - ut_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) - x_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) - ux_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan) - i_sensor = 0 - # for key in buff.keys(): # arbitrary order - - for key in key_list: - data_arr = buff[key] - t_data_arr2d[:, i_sensor] = data_arr[:, 0] - ut_data_arr2d[:, i_sensor] = data_arr[:, 1] - x_data_arr2d[:, i_sensor] = data_arr[:, 2] - ux_data_arr2d[:, i_sensor] = data_arr[:, 3] - i_sensor = i_sensor + 1 - - if self.calc_type == "lcs": - data = np.full(shape=(self.n_pr, 4), fill_value=np.nan) - for i_pnt in range(self.n_pr): - y_arr = np.array(x_data_arr2d[i_pnt, :]) - y_arr = y_arr.reshape((n_sensors, 1)) - vy_arr2d = np.zeros(shape=(n_sensors, n_sensors)) - for i_sensor in range(n_sensors): - vy_arr2d[i_sensor, i_sensor] = np.square(ux_data_arr2d[i_pnt, i_sensor]) - - n_sols, ybest, uybest, chi2obs, indkeep = calc_lcs(y_arr, vy_arr2d, self.problim) - if n_sols == 1: # time stamp is value of first sensor - if isinstance(ybest, np.ndarray): - ybest = ybest[0] - data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest, uybest]) - else: # only return the first solution - data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest[0], uybest[0]]) - - elif self.calc_type == "lcss": - # lcss applied to one data vector (required input) - # Sum the signals to get one signal - x_data_arr = np.sum(x_data_arr2d, axis=1) - x_data_arr = x_data_arr.reshape((len(x_data_arr), 1)) - ux2_data_arr = np.sum(np.square(ux_data_arr2d), axis=1) - vx_arr2d = np.zeros((self.n_pr, self.n_pr)) - for i_val in range(self.n_pr): - vx_arr2d[i_val, i_val] = ux2_data_arr[i_val] - - n_sols, ybest, uybest, chi2obs, indkeep = \ - calc_lcss(self.a_arr, self.a_arr2d, x_data_arr, vx_arr2d, self.problim) - print('calc lcss finished') - print('n_sols: ', n_sols) - print('ybest: ', ybest) - print('uybest: ', uybest) - if n_sols == 1: # time stamp is latest value - if isinstance(ybest, np.ndarray): - ybest = ybest[0] - data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest, uybest]) - else: # only return the first solution - data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest[0], uybest[0]]) - - # Send the data - if len(data.shape) == 1: - data = data.reshape((1, len(data))) - - self.set_output_data(channel="default", data=data) - super().agent_loop() - - def on_received_message(self, message): - """ - Handles incoming data from 'default' channels. - Stores 'default' data into an internal buffer - - Parameters - ---------- - message : dict - Only acceptable channel value is 'default'. - """ - if message['channel'] == 'default': - self.buffer_store(agent_from=message["from"], data=message["data"]) - return 0 diff --git a/agentMET4FOF_tutorials/tutorial_7_redundancyAgents.py b/agentMET4FOF_tutorials/tutorial_7_redundancyAgents.py deleted file mode 100644 index 138623dc..00000000 --- a/agentMET4FOF_tutorials/tutorial_7_redundancyAgents.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Example 1 of using a Redundancy Agent. -Four signals are generated and data is supplied to the Redundancy Agent. -The Redundancy Agent calculates the best consistent estimate taking into account the supplied uncertainties. -""" - -import numpy as np -from agentMET4FOF.agents import AgentNetwork -from agentMET4FOF.metrological_agents import MetrologicalMonitorAgent - -from agentMET4FOF.metrological_streams import MetrologicalMultiWaveGenerator -from agentMET4FOF_redundancy.redundancyAgents1 import MetrologicalMultiWaveGeneratorAgent, RedundancyAgent - - -def demonstrate_redundancy_agent_four_signals(): - """ - At the start of the main module all important parameters are defined. Then the agents are defined and the network - is started. The network and the calculated results can be monitored in a browser at the address http://127.0.0.1:8050/. - """ - # parameters - batch_size = 10 - n_pr = batch_size - fsam = 100 - intercept = 10 - freqs = [6, 10, 8, 12] - phases = [1, 2, 3, 4] - ampls = [0.3, 0.2, 0.5, 0.4] - exp_unc_abs = 0.2 - problim = 0.95 - - # start agent network server - agent_network: AgentNetwork = AgentNetwork(dashboard_modules=True, backend='mesa') - - # Initialize signal generating class outside of agent framework. - signal_arr = [MetrologicalMultiWaveGenerator(sfreq=fsam, freq_arr=np.array([freq]), intercept=intercept, - ampl_arr=np.array([ampl]), phase_ini_arr=np.array([phi]), - value_unc=exp_unc_abs) for freq, phi, ampl in - zip(freqs, phases, ampls)] - - # Data source agents. - source_agents = [] - sensor_key_list = [] - for count, signal in enumerate(signal_arr): - sensor_key_list += ["Sensor" + str(count + 1)] - source_agents += [agent_network.add_agent(name=sensor_key_list[-1], agentType=MetrologicalMultiWaveGeneratorAgent)] - source_agents[-1].init_parameters(signal=signal, batch_size=batch_size) - - # Redundant data processing agent - redundancy_name1 = "RedundancyAgent1" - redundancy_agent1 = agent_network.add_agent(name=redundancy_name1, agentType=RedundancyAgent) - redundancy_agent1.init_parameters(sensor_key_list=sensor_key_list, n_pr=n_pr, problim=problim, calc_type="lcs") - - # Initialize metrologically enabled plotting agent. - monitor_agent1 = agent_network.add_agent(name="MonitorAgent_SensorValues", - agentType=MetrologicalMonitorAgent) - monitor_agent2 = agent_network.add_agent(name="MonitorAgent_RedundantEstimate", agentType=MetrologicalMonitorAgent) - - # Bind agents. - for source_agent in source_agents: - source_agent.bind_output(monitor_agent1) - source_agent.bind_output(redundancy_agent1) - - redundancy_agent1.bind_output(monitor_agent2) - - # Set all agents states to "Running". - agent_network.set_running_state() - - # Allow for shutting down the network after execution. - return agent_network - - -if __name__ == "__main__": - demonstrate_redundancy_agent_four_signals() diff --git a/agentMET4FOF_tutorials/tutorial_8_redundancyAgents.py b/agentMET4FOF_tutorials/tutorial_8_redundancyAgents.py deleted file mode 100644 index 27865498..00000000 --- a/agentMET4FOF_tutorials/tutorial_8_redundancyAgents.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -Example 2 of using a Redundancy Agent. -A single signal is generated and supplied to the Redundancy Agent. -The Redundancy Agent uses the redundancy in the data vector together with some prior knowledge -in order to calculate the best consistent estimate taking into account the supplied uncertainties. -""" - -import numpy as np -from agentMET4FOF.agents import AgentNetwork -from agentMET4FOF.metrological_agents import MetrologicalMonitorAgent - -from agentMET4FOF.metrological_streams import MetrologicalMultiWaveGenerator -from agentMET4FOF_redundancy.redundancyAgents1 import MetrologicalMultiWaveGeneratorAgent, RedundancyAgent - - -def demonstrate_redundancy_agent_onesignal(): - """ - At the start of the main module all important parameters are defined. Then the agents are defined and the network - is started. The network and the calculated results can be monitored in a browser at the address http://127.0.0.1:8050/. - """ - # parameters - batch_size = 20 - n_pr = 20 - fsam = 40 - f1 = 6 - f2 = 10 - phi1 = 1 - phi2 = 2 - ampl1 = 230 - ampl2 = 20 - exp_unc_abs = 0.2 # absolute expanded uncertainty - problim = 0.95 - - # start agent network server - agent_network = AgentNetwork(dashboard_modules=True, backend='mesa') - - # Initialize signal generating class outside of agent framework. - signal1 = MetrologicalMultiWaveGenerator(sfreq=fsam, freq_arr=np.array([f1, f2]), ampl_arr=np.array([ampl1, ampl2]), - phase_ini_arr=np.array([phi1, phi2]), value_unc=exp_unc_abs) - - # Data source agents. - source_name1 = "Sensor1" # signal1.metadata.metadata["device_id"] - source_agent1 = agent_network.add_agent(name=source_name1, agentType=MetrologicalMultiWaveGeneratorAgent) - source_agent1.init_parameters(signal=signal1, batch_size=batch_size) - - # Redundant data processing agent - sensor_key_list = [source_name1] - redundancy_name1 = "RedundancyAgent1" - redundancy_agent1 = agent_network.add_agent(name=redundancy_name1, agentType=RedundancyAgent) - redundancy_agent1.init_parameters(sensor_key_list=sensor_key_list, calc_type="lcss", n_pr=n_pr, problim=problim) - - # prior knowledge needed for redundant evaluation of the data - redundancy_agent1.init_parameters2(fsam=fsam, f1=f1, f2=f2, ampl_ratio=ampl1/ampl2, phi1=phi1, phi2=phi2) - - # Initialize metrologically enabled plotting agent. Agent name cannot contain spaces!! - monitor_agent1 = agent_network.add_agent(name="MonitorAgent_SensorValues", agentType=MetrologicalMonitorAgent) - monitor_agent2 = agent_network.add_agent(name="MonitorAgent_RedundantEstimate", agentType=MetrologicalMonitorAgent) - - # Bind agents. - source_agent1.bind_output(monitor_agent1) - source_agent1.bind_output(redundancy_agent1) - redundancy_agent1.bind_output(monitor_agent2) - - # Set all agents states to "Running". - agent_network.set_running_state() - - # Allow for shutting down the network after execution. - return agent_network - - -if __name__ == "__main__": - demonstrate_redundancy_agent_onesignal()