diff --git a/CobraBay/__init__.py b/CobraBay/__init__.py new file mode 100644 index 0000000..b22b3bf --- /dev/null +++ b/CobraBay/__init__.py @@ -0,0 +1,32 @@ +#### +# Cobra Bay init +#### + +from .bay import CBBay +from .display import CBDisplay +from .core import CBCore +from .config import CBConfig +from .network import CBNetwork +from .systemhw import CBPiStatus +from .version import __version__ + +import CobraBay.detectors +import CobraBay.sensors +import CobraBay.triggers + +# def read_version(): +# print(__file__) +# """Read a text file and return the content as a string.""" +# with io.open("/CobraBay/CobraBay/version.py") as f: +# return f.read() + +__repo__ = "https://github.com/chrisgilldc/cobrabay.git" +all = [ + 'CBBay', + 'CBDisplay', + 'CBCore', + 'CBConfig', + 'CBNetwork', + 'CBPiStatus', + '__version__' +] diff --git a/lib/bay.py b/CobraBay/bay.py similarity index 89% rename from lib/bay.py rename to CobraBay/bay.py index b7fe304..eab521f 100644 --- a/lib/bay.py +++ b/CobraBay/bay.py @@ -4,11 +4,12 @@ import time from pint import UnitRegistry, Quantity from time import monotonic -from .detector import CB_VL53L1X +from .detectors import CB_VL53L1X import logging from pprint import pformat from functools import wraps - +import sys +from .exceptions import SensorValueException # Scan the detectors if we're asked for a property that needs a fresh can and we haven't scanned recently enough. def scan_if_stale(func): @@ -33,18 +34,53 @@ def wrapper(self, *args, **kwargs): return wrapper -class Bay: - def __init__(self, bay_id, config, detectors): - # Get our settings. - self._settings = config.bay(bay_id) +class CBBay: + def __init__(self, bay_id, + bay_name, + bay_depth, + stop_point, + park_time, + output_unit, + detectors, + detector_settings, + log_level="DEBUG"): + """ + :param bay_id: ID for the bay. Cannot have spaces. + :type bay_id: str + :param bay_name: Long "friendly" name for the Bay, used in MQTT messages + :type bay_name: str + :param bay_depth: Absolute distance of the bay, from the range sensor to the end. Must be a linear Quantity. + :type bay_depth: Quantity + :param stop_point: Distance from the sensor where the vehicle should stop + :type stop_point: Quantity + :param park_time: int + :param output_unit: Unit to output measurements in. Should be a distance unit understood by Pint (ie: 'in', 'cm', etc) + :type output_unit: str + :param detectors: Dictionary of detectors to use. + :type detectors: dict + :param detector_settings: Dictionary of detector configuration settings. + :type detector_settings: dict + :param log_level: Log level for the bay, must be a Logging level. + :type log_level: str + """ + # Save parameters + self._bay_id = bay_id + self._bay_name = bay_name + self._bay_depth = bay_depth + self._stop_point = stop_point + self._park_time = park_time + self._output_unit = output_unit + self._detectors = detectors + self._detector_settings = detector_settings # Create a logger. self._logger = logging.getLogger("CobraBay").getChild(self.bay_id) - self._logger.setLevel(config.get_loglevel(bay_id)) + self._logger.setLevel(log_level) + self._logger.info("Initializing bay: {}".format(bay_id)) + self._logger.debug("Bay received detectors: {}".format(detectors)) # Initialize variables. self._position = {} self._quality = {} - self._detectors = {} self._trigger_registry = {} self._previous_scan_ts = 0 self._state = None @@ -53,7 +89,6 @@ def __init__(self, bay_id, config, detectors): # Log our initialization. self._logger.info("Bay '{}' initializing...".format(self.bay_id)) self._logger.info("Bay has settings:") - self._logger.info(pformat(self._settings)) # Create a unit registry. self._ureg = UnitRegistry @@ -170,7 +205,12 @@ def _scan_detectors(self, filter_lateral=True): quality = {} # Check all the detectors. for detector_name in self._detectors: - position[detector_name] = self._detectors[detector_name].value + try: + position[detector_name] = self._detectors[detector_name].value + except SensorValueException: + # For now, pass. Need to add logic here to actually set the overall bay status. + pass + quality[detector_name] = self._detectors[detector_name].quality self._logger.debug("Read of detector {} returned value '{}' and quality '{}'". format(self._detectors[detector_name].name, position[detector_name], @@ -459,28 +499,28 @@ def _detector_state(self, mode): @property def bay_id(self): - return self._settings['bay_id'] + return self._bay_id @bay_id.setter def bay_id(self, input): - self._settings['bay_id'] = input + self._bay_id = input @property def bay_name(self): - return self._settings['bay_name'] + return self._bay_name @bay_name.setter def bay_name(self, input): - self._settings['bay_name'] = input + self._bay_name = input # Apply specific config options to the detectors. def _setup_detectors(self): # For each detector we use, apply its properties. self._logger.debug("Detectors: {}".format(self._detectors.keys())) - for dc in self._settings['detectors']['settings'].keys(): + for dc in self._detector_settings.keys(): self._logger.debug("Configuring detector {}".format(dc)) - self._logger.debug("Settings: {}".format(self._settings['detectors']['settings'][dc])) - for item in self._settings['detectors']['settings'][dc]: + self._logger.debug("Settings: {}".format(self._detector_settings[dc])) + for item in self._detector_settings[dc]: self._logger.debug( - "Setting property {} to {}".format(item, self._settings['detectors']['settings'][dc][item])) - setattr(self._detectors[dc], item, self._settings['detectors']['settings'][dc][item]) + "Setting property {} to {}".format(item, self._detector_settings[dc][item])) + setattr(self._detectors[dc], item, self._detector_settings[dc][item]) diff --git a/CobraBay/cli/__init__.py b/CobraBay/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CobraBay/cli/main.py b/CobraBay/cli/main.py new file mode 100644 index 0000000..b6ef1ed --- /dev/null +++ b/CobraBay/cli/main.py @@ -0,0 +1,52 @@ +#### +# Cobra Bay - Main Executor +#### + +import argparse +import pathlib +import sys +import CobraBay +import logging +from logging.handlers import WatchedFileHandler + +from pid import PidFile + +def main(): + # Parse command line options. + parser = argparse.ArgumentParser( + description="CobraBay Parking System" + ) + parser.add_argument("-c", "--config", help="Config file location.") + parser.add_argument("-r", "--run-dir", help="Run directory, for the PID file.") + args = parser.parse_args() + + try: + arg_config = pathlib.Path(args.config) + except TypeError: + arg_config = None + + # Create the Master logger. + master_logger = logging.getLogger("CobraBay") + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + master_logger.addHandler(console_handler) + + # Create a CobraBay config object. + try: + cbconfig = CobraBay.CBConfig(config_file=arg_config, reset_sensors=True) + except ValueError as e: + print(e) + sys.exit(1) + + # Initialize the object. + cb = CobraBay.CBCore(config_obj=cbconfig) + + # Start the main operating loop. + with PidFile('CobraBay', piddir='/tmp') as p: + print("Pid file name: {}".format(p.pidname)) + print("Pid directory: {}".format(p.piddir)) + cb.run() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/CobraBay/cli/sensor_test.py b/CobraBay/cli/sensor_test.py new file mode 100644 index 0000000..b82279b --- /dev/null +++ b/CobraBay/cli/sensor_test.py @@ -0,0 +1,100 @@ +#### +# Cobra Bay - Sensor Testing Tool +#### + +import argparse +import pathlib +import sys +import CobraBay +import logging + +from pid import PidFile, PidFileAlreadyRunningError, PidFileAlreadyLockedError + +def main(): + # Parse command line options. + parser = argparse.ArgumentParser( + description="CobraBay Sensor Tester" + ) + parser.add_argument("-c", "--config", help="Config file location.") + parser.add_argument("-r", "--run-dir", help="Run directory, for the PID file.") + parser.add_argument("-n", default=1, help="Number of test readings to take.") + parser.add_argument("-s", default="all", help="Sensors to test.") + args = parser.parse_args() + + # Check for an active CobraBay instance. + if args.run_dir: + pid_lock = PidFile('CobraBay', piddir=args.run_dir) + else: + # Default the pid file to /tmp + pid_lock = PidFile('CobraBay', piddir="/tmp") + + try: + pid_lock.check() + except PidFileAlreadyRunningError: + print("Cannot run sensor test while CobraBay is active. CobraBay running as PID {}".format(pid_lock.pid)) + sys.exit(1) + except PidFileAlreadyLockedError: + print("CobraBay lock exists but appears not to be running. May be stale. " + "Check directory '{}', clear and retry.".format(pid_lock.piddir)) + + try: + arg_config = pathlib.Path(args.config) + except TypeError: + arg_config = None + + # Create the Master logger. + master_logger = logging.getLogger("CobraBay") + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + master_logger.addHandler(console_handler) + + # Create a CobraBay config object. + try: + cbconfig = CobraBay.CBConfig(config_file=arg_config, reset_sensors=True) + except ValueError as e: + print(e) + sys.exit(1) + + # Start the main operating loop. + with PidFile('CobraBay', piddir='/tmp') as p: + # Load the various sensors. + sensors = load_sensors(cbconfig) + print("Loaded sensors: {}".format(list(sensors.keys()))) + if args.s == 'all': + test_sensors = sensors.keys() + else: + if args.s not in sensors.keys(): + print("Requested test sensors '{}' not in configured sensors.") + sys.exit(1) + else: + test_sensors = [ args.s ] + + for sensor in sensors: + sensors[sensor].start_ranging() + + i = 1 + while i <= int(args.n): + print("Test cycle: {}".format(i)) + for sensor in test_sensors: + print("{} - {}".format(sensor, sensors[sensor].range)) + i += 1 + + +# Method to load just the sensors out of a given CobraBay Config. +def load_sensors(cbconfig): + sensors = {} + for detector_id in cbconfig.detector_list: + detector_config = cbconfig.detector(detector_id) + if detector_config['sensor']['type'] == 'VL53L1X': + # Create the sensor object using provided settings. + sensor_obj = CobraBay.sensors.CB_VL53L1X(detector_config['sensor']) + elif detector_config['sensor']['type'] == 'TFMini': + sensor_obj = CobraBay.sensors.TFMini(detector_config['sensor']) + else: + continue + sensors[detector_id] = sensor_obj + return sensors + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/lib/config.py b/CobraBay/config.py similarity index 71% rename from lib/config.py rename to CobraBay/config.py index fb71d2c..368e6f0 100644 --- a/lib/config.py +++ b/CobraBay/config.py @@ -2,18 +2,19 @@ # Cobra Bay - Config Loader #### import logging +import os.path import sys import yaml from pathlib import Path from pint import Quantity from pprint import pformat +import importlib class CBConfig: def __init__(self, config_file=None, reset_sensors=False): self._config = None self._logger = logging.getLogger("CobraBay").getChild("Config") - self._logger.info("Processing config...") # Initialize the internal config file variable self._config_file = None @@ -22,7 +23,7 @@ def __init__(self, config_file=None, reset_sensors=False): Path('/etc/cobrabay/config.yaml'), Path.cwd().joinpath('config.yaml') ] - if config_file is not None: + if isinstance(config_file,Path): search_paths.insert(0, Path(config_file)) for path in search_paths: @@ -31,7 +32,7 @@ def __init__(self, config_file=None, reset_sensors=False): except: pass if self._config_file is None: - raise ValueError("Cannot find valid config file! Attempted: {}".format(search_paths)) + raise ValueError("Cannot find valid config file! Attempted: {}".format([str(i) for i in search_paths])) # Load the config file. This does validation as well! self.load_config() @@ -46,6 +47,8 @@ def load_config(self, reset_sensors=False): else: self._logger.info("Config file validated. Loading.") # We're good, so assign the staging to the real config. + print("Loaded config...") + print(pformat(validated_yaml)) self._logger.debug(pformat(validated_yaml)) self._config = validated_yaml @@ -99,7 +102,7 @@ def _validator(self, staging_yaml): # Validate the system section. def _validate_system(self, system_config): - valid_keys = ('units', 'system_name', 'network', 'mqtt_commands', 'interface', 'homeassistant', 'logging') + valid_keys = ('units', 'system_name', 'mqtt', 'mqtt_commands', 'interface', 'homeassistant', 'logging') required_keys = ('system_name', 'interface') # Remove any key values that aren't valid. for actual_key in system_config: @@ -225,7 +228,15 @@ def display(self): config_dict['gpio_slowdown'] = self._config['display']['matrix']['gpio_slowdown'] config_dict['mqtt_image'] = self._config['display']['mqtt_image'] config_dict['mqtt_update_interval'] = Quantity(self._config['display']['mqtt_update_interval']) - config_dict['core_font'] = 'fonts/OpenSans-Light.ttf' + # Default font is the packaged-in OpenSans Light. + with importlib.resources.path("CobraBay.data", 'OpenSans-Light.ttf') as p: + core_font_path = p + self._logger.debug("Using default font at path: {}".format(core_font_path)) + config_dict['core_font'] = str(core_font_path) + # If the font is defined and exists, use that. + if 'font' in self._config['display']: + if os.path.isfile(self._config['display']['font']): + config_dict['core_font'] = self._config['display']['font'] return config_dict # Return a settings dict to be used for the Network module. @@ -238,6 +249,18 @@ def network(self): except KeyError: config_dict['homeassistant'] = False config_dict['interface'] = self._config['system']['interface'] + # Check for MQTT definition + if 'mqtt' not in self._config['system'].keys(): + raise ValueError("MQTT must be defined in system block.") + elif not isinstance(self._config['system']['mqtt'],dict): + raise TypeError("MQTT definition must be a dictionary of values.") + else: + required_keys = ['broker','port','username','password'] + for rk in required_keys: + if rk not in self._config['system']['mqtt']: + raise ValueError("Required key '{}' not in system MQTT definition".format(rk)) + # Should be good, pull it over. + config_dict['mqtt'] = self._config['system']['mqtt'] return config_dict def bay(self, bay_id): @@ -247,13 +270,11 @@ def bay(self, bay_id): # Initialize the config dict. Include the bay ID, and default to metric. config_dict = { 'bay_id': bay_id, - 'unit_system': 'metric', 'output_unit': 'm' } - # If Imperial is defined, set that. + # If Imperial is defined, bay should output in inches. try: if self._config['system']['units'].lower() == 'imperial': - config_dict['unit_system'] = 'imperial' config_dict['output_unit'] = 'in' except KeyError: pass @@ -267,30 +288,12 @@ def bay(self, bay_id): # How long there should be no motion until we consider the bay to be parked. Convert to seconds and take out # magnitude. config_dict['park_time'] = Quantity(self._config['bays'][bay_id]['park_time']).to('second').magnitude - # Actual bay depth. + # Actual bay depth, from the range sensor to the garage door. config_dict['bay_depth'] = Quantity(self._config['bays'][bay_id]['bay_depth']).to('cm') - # Adjust the bay depth by the offset. - config_dict['adjusted_depth'] = Quantity(config_dict['bay_depth']) - Quantity( - self._config['bays'][bay_id]['longitudinal']['defaults']['offset']).to('cm') - - # Get the defined defaults for detectors - defaults = {'lateral': {}, 'longitudinal': {}} - detector_options = { - 'longitudinal': ('offset', 'bay_depth', 'spread_park'), - 'lateral': ('offset', 'spread_ok', 'spread_warn', 'side', 'intercept') - } - for direction in ('lateral', 'longitudinal'): - for item in detector_options[direction]: - try: - defaults[direction][item] = self._config['bays'][bay_id][direction]['defaults'][item] - except KeyError: - pass - - self._logger.debug("Assembled Longitudinal defaults: {}".format(defaults['longitudinal'])) - self._logger.debug("Assembled Lateral defaults: {}".format(defaults['lateral'])) + # Stop point, the distance from the sensor to the point where the vehicle should stop. + config_dict['stop_point'] = Quantity(self._config['bays'][bay_id]['stop_point']).to('cm') - config_dict['detectors'] = {} - config_dict['detectors'] = { + config_dict['detector_settings'] = { 'selected_range': None, 'settings': {}, 'intercepts': {}, @@ -298,43 +301,42 @@ def bay(self, bay_id): 'lateral': [] } - # Create the actual detector configurations. + # Create the detector configuration for the bay. + # Each bay applies bay-specific options to a detector when it initializes. for direction in ('longitudinal', 'lateral'): + # Pull the defaults as a base. Otherwise, it's an empty dict. + try: + direction_defaults = self._config['bays'][bay_id][direction]['defaults'] + except KeyError: + direction_defaults = {} for detector in self._config['bays'][bay_id][direction]['detectors']: - # Build the detector config for this detector. Check the config file for overrides first, then use - # defaults. If we can't do either, then raise an error. - detector_config = {} - for config_item in detector_options[direction]: - try: - detector_config[config_item] = detector[config_item] - except KeyError: - # No detector_specific option. Try to use the default. - try: - detector_config[config_item] = defaults[direction][config_item] - except KeyError: - # Couldn't find this either. That's a problem! - raise + self._logger.debug("Processing bay-specific config for detector {}".format(detector)) + # Merge the dicts. The union will combine keys, and use common keys from detector specific config. + self._logger.debug("Using defaults for detector: {} ({})".format(direction_defaults, type(direction_defaults))) + self._logger.debug( + "Using detector-specific config for detector: {} ({})".format(detector, type(detector))) + detector_config = dict( direction_defaults.items() | detector.items() ) self._logger.debug("Assembled detector config: {}".format(detector_config)) # Store the settings. - config_dict['detectors']['settings'][detector['detector']] = detector_config + config_dict['detector_settings']['settings'][detector['detector']] = detector_config # Save the name in the right place. if direction == 'longitudinal': - config_dict['detectors']['longitudinal'].append(detector['detector']) + config_dict['detector_settings']['longitudinal'].append(detector['detector']) # Lateral detectors have an intercept distance. if direction == 'lateral': - config_dict['detectors']['lateral'].append(detector['detector']) + config_dict['detector_settings']['lateral'].append(detector['detector']) try: - config_dict['detectors']['intercepts'][detector['detector']] = Quantity(detector['intercept']) + config_dict['detector_settings']['intercepts'][detector['detector']] = Quantity(detector['intercept']) except KeyError as ke: raise Exception('Lateral detector {} does not have intercept distance defined!' .format(detector['detector'])) from ke # Pick a range sensor to use as 'primary'. - if config_dict['detectors']['selected_range'] is None: + if config_dict['detector_settings']['selected_range'] is None: # If there's only one longitudinal detector, that's the one to use for range. - if len(config_dict['detectors']['longitudinal']) == 1: - config_dict['detectors']['selected_range'] = config_dict['detectors']['longitudinal'][0] + if len(config_dict['detector_settings']['longitudinal']) == 1: + config_dict['detector_settings']['selected_range'] = config_dict['detector_settings']['longitudinal'][0] return config_dict # Config dict for a detector. @@ -344,28 +346,62 @@ def detector(self, detector_id): raise KeyError("No configuration defined for detector '{}'".format(detector_id)) # Assemble the config dict config_dict = { - 'id': detector_id, + 'detector_id': detector_id, 'name': self._config['detectors'][detector_id]['name'], - 'sensor': self._config['detectors'][detector_id]['sensor'] + 'type': self._config['detectors'][detector_id]['type'].lower(), + 'sensor_type': self._config['detectors'][detector_id]['sensor']['type'], + 'sensor_settings': self._config['detectors'][detector_id]['sensor'] } - # Initialize required setting values so they exist. This is required so readiness can be checked. - # If they're defined in the config, great, use those values, otherwise initialize as None. - if self._config['detectors'][detector_id]['type'].lower() == 'range': - required_settings = ['offset', 'bay_depth', 'spread_park', 'pct_warn', 'pct_crit', 'error_margin'] - elif self._config['detectors'][detector_id]['type'].lower() == 'lateral': - required_settings = ['offset', 'spread_ok', 'spread_warn', 'side'] - else: - raise ValueError("Detector {} has unknown type.".format(detector_id)) - - # The required settings list is stored in settings itself, so it can be used by the check_ready decorator. - config_dict['required'] = required_settings + # Add the logger to the sensor settings, so the sensor can log directly. + config_dict['sensor_settings']['logger'] = 'CobraBay.sensors' + del config_dict['sensor_settings']['type'] - for required_setting in required_settings: + # Optional parameters if included. + if config_dict['type'] == 'range': try: - config_dict[required_setting] = self._config['detectors'][detector_id][required_setting] + config_dict['error_margin'] = self._config['detectors'][detector_id]['error_margin'] except KeyError: - config_dict[required_setting] = None + config_dict['error_margin'] = Quantity("0 cm") + elif config_dict['type'] == 'lateral': + pass + # # Initialize required setting values so they exist. This is required so readiness can be checked. + # # If they're defined in the config, great, use those values, otherwise initialize as None. + # if self._config['detectors'][detector_id]['type'].lower() == 'range': + # required_settings = ['offset', 'spread_park', 'pct_warn', 'pct_crit'] + # fallback_defaults = { + # 'offset': Quantity("0 cm"), + # 'spread_park': Quantity("2 in"), + # 'pct_warn': 90, + # 'pct_crit': 95 + # } + # elif self._config['detectors'][detector_id]['type'].lower() == 'lateral': + # required_settings = ['offset', 'spread_ok', 'spread_warn', 'side'] + # fallback_defaults = { + # 'offset': Quantity("0 cm"), + # 'spread_ok': Quantity("1 in"), + # 'spread_warn': Quantity("3 in"), + # 'side': None + # } + # else: + # raise ValueError("Detector {} has unknown type.".format(detector_id)) + # + # for required_setting in required_settings: + # try: + # config_dict['sensor_settings'][required_setting] = self._config['detectors'][detector_id][required_setting] + # except KeyError: + # try: + # config_dict[required_setting] = self._config['detectors']['defaults'][required_setting] + # self._logger.warning("For {} assigned setting '{}' default value '{}'". + # format(detector_id, required_setting, + # self._config['detectors']['defaults'][required_setting])) + # except KeyError: + # if fallback_defaults[required_setting] is None: + # self._logger.critical("Setting '{}' not configured for detector '{}', and has no default. " + # "Must be set! Cannot continue.".format(required_setting, detector_id)) + # else: + # config_dict['sensor_settings'][required_setting] = fallback_defaults[required_setting] + # self._logger.debug("Returning config: {}".format(config_dict)) return config_dict diff --git a/lib/cobrabay.py b/CobraBay/core.py similarity index 84% rename from lib/cobrabay.py rename to CobraBay/core.py index 17a56e3..e296e9f 100644 --- a/lib/cobrabay.py +++ b/CobraBay/core.py @@ -11,39 +11,49 @@ # import os # import sys # import psutil +from pprint import pformat + +import CobraBay # Import the other CobraBay classes -from .bay import Bay -from .config import CBConfig -from .display import Display -from .detector import Lateral, Range -from .network import Network -from .systemhw import PiStatus -from . import triggers -from .version import __version__ - - -class CobraBay: - def __init__(self, cmd_opts=None): +#from CobraBay.bay import Bay +#from CobraBay.config import CBConfig +#from CobraBay.display import Display +# from CobraBay.detectors import Lateral, Range +# #from CobraBay.network import CBNetwork +# from CobraBay.sensors import CB_VL53L1X +# # from CobraBay.systemhw import CBPiStatus +# from . import triggers +# from CobraBay.version import __version__ +import sys + + +class CBCore: + def __init__(self, config_obj): # Register the exit handler. atexit.register(self.system_exit) - # Create the master logger. All modules will hang off this. + # Get the master handler. This may have already been started by the command line invoker. self._master_logger = logging.getLogger("CobraBay") # Set the master logger to Debug, so all other messages will pass up through it. self._master_logger.setLevel(logging.DEBUG) - # By default, set up console logging. This will be disabled if config file tells us to. - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.DEBUG) - self._master_logger.addHandler(console_handler) + # If console handler isn't already on the master logger, add it by default. Will be removed later if the + # config tells us to. + if not len(self._master_logger.handlers): + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + self._master_logger.addHandler(console_handler) # Create a "core" logger, for just this module. self._logger = logging.getLogger("CobraBay").getChild("Core") # Initial startup message. - self._logger.info("CobraBay {} initializing...".format(__version__)) + self._logger.info("CobraBay {} initializing...".format(CobraBay.__version__)) - # Create a config object. - self._cbconfig = CBConfig(reset_sensors=True) + if not isinstance(config_obj,CobraBay.CBConfig): + raise TypeError("CobraBay core must be passed a CobraBay Config object (CBConfig).") + else: + # Save the passed CBConfig object. + self._cbconfig = config_obj # Update the logging handlers. self._setup_logging_handlers(self._cbconfig.log_handlers()) @@ -56,12 +66,12 @@ def __init__(self, cmd_opts=None): # Create the object for checking hardware status. self._logger.debug("Creating Pi hardware monitor...") - self._pistatus = PiStatus() + self._pistatus = CobraBay.CBPiStatus() # Create the network object. self._logger.debug("Creating network object...") # Create Network object. - self._network = Network(config=self._cbconfig) + self._network = CobraBay.CBNetwork(config=self._cbconfig) self._logger.info('Connecting to network...') # Connect to the network. self._network.connect() @@ -82,10 +92,13 @@ def __init__(self, cmd_opts=None): self._logger.debug("Creating bays...") for bay_id in self._cbconfig.bay_list: self._logger.info("Bay ID: {}".format(bay_id)) - self._bays[bay_id] = Bay(bay_id, self._cbconfig, self._detectors) + bay_config = self._cbconfig.bay(bay_id) + self._logger.debug("Bay config:") + self._logger.debug(pformat(bay_config)) + self._bays[bay_id] = CobraBay.CBBay(**bay_config, detectors=self._detectors) self._logger.info('CobraBay: Creating display...') - self._display = Display(self._cbconfig) + self._display = CobraBay.CBDisplay(self._cbconfig) # Register the bay with the network and display. for bay_id in self._bays: @@ -134,7 +147,7 @@ def __init__(self, cmd_opts=None): # Common network handler, pushes data to the network and makes sure the MQTT client can poll. def _network_handler(self): # Add hardware status messages. - self._mqtt_hw() + self._outbound_messages.extend(self._mqtt_hw()) # Send the outbound message queue to the network module to handle. After, we empty the message queue. network_data = self._network.poll(self._outbound_messages) # We've pushed the message out, so reset our current outbound message queue. @@ -240,18 +253,20 @@ def _motion(self, bay_id, cmd): # Utility method to put the hardware status on the outbound message queue. This needs to be used from a few places. def _mqtt_hw(self): - self._outbound_messages.append( + hw_messages = [] + hw_messages.append( {'topic_type': 'system', 'topic': 'cpu_pct', 'message': self._pistatus.status('cpu_pct'), 'repeat': False}) - self._outbound_messages.append( + hw_messages.append( {'topic_type': 'system', 'topic': 'cpu_temp', 'message': self._pistatus.status('cpu_temp'), 'repeat': False}) - self._outbound_messages.append( + hw_messages.append( {'topic_type': 'system', 'topic': 'mem_info', 'message': self._pistatus.status('mem_info'), 'repeat': False}) - self._outbound_messages.append( + hw_messages.append( {'topic_type': 'system', 'topic': 'undervoltage', 'message': self._pistatus.status('undervoltage'), 'repeat': False} ) + return hw_messages def undock(self): self._logger.info('CobraBay: Undock not yet implemented.') @@ -302,10 +317,14 @@ def _setup_detectors(self): return_dict = {} for detector_id in self.config['detectors']: self._logger.info("Creating detector: {}".format(detector_id)) - if self.config['detectors'][detector_id]['type'] == 'Range': - return_dict[detector_id] = Range(self._cbconfig, detector_id) - if self.config['detectors'][detector_id]['type'] == 'Lateral': - return_dict[detector_id] = Lateral(self._cbconfig, detector_id) + detector_config = self._cbconfig.detector(detector_id) + if detector_config['type'] == 'range': + self._logger.debug("Setting up Range detector with settings: {}".format(detector_config)) + return_dict[detector_id] = CobraBay.detectors.Range(**detector_config) + if detector_config['type'] == 'lateral': + self._logger.debug("Setting up Lateral detector with settings: {}".format(detector_config)) + return_dict[detector_id] = CobraBay.detectors.Lateral(**detector_config) + self._logger.debug("VL53LX instances: {}".format(len(CobraBay.sensors.CB_VL53L1X.instances))) return return_dict def _setup_triggers(self): @@ -319,17 +338,17 @@ def _setup_triggers(self): # Create trigger object based on type. # All triggers except the system command handler will need a reference to the bay object. if trigger_config['type'] == "syscommand": - return_dict[trigger_id] = triggers.SysCommand(trigger_config) + return_dict[trigger_id] = CobraBay.triggers.SysCommand(trigger_config) else: bay_obj = self._bays[trigger_config['bay_id']] if trigger_config['type'] == 'mqtt_sensor': - return_dict[trigger_id] = triggers.MQTTSensor(trigger_config, bay_obj) + return_dict[trigger_id] = CobraBay.triggers.MQTTSensor(trigger_config, bay_obj) elif trigger_config['type'] == 'baycommand': # Get the bay object reference. - return_dict[trigger_id] = triggers.BayCommand(trigger_config, bay_obj) + return_dict[trigger_id] = CobraBay.triggers.BayCommand(trigger_config, bay_obj) elif trigger_config['type'] == 'range': # Range triggers also need the detector object. - return_dict[trigger_id] = triggers.Range(trigger_config, bay_obj, + return_dict[trigger_id] = CobraBay.triggers.Range(trigger_config, bay_obj, self._detectors[trigger_config['detector']]) else: # This case should be trapped by the config processor, but just in case, if trigger type diff --git a/fonts/OpenSans-Light.ttf b/CobraBay/data/OpenSans-Light.ttf similarity index 100% rename from fonts/OpenSans-Light.ttf rename to CobraBay/data/OpenSans-Light.ttf diff --git a/CobraBay/data/__init__.py b/CobraBay/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CobraBay/detectors.py b/CobraBay/detectors.py new file mode 100644 index 0000000..ce42f4a --- /dev/null +++ b/CobraBay/detectors.py @@ -0,0 +1,556 @@ +#### +# CobraBay Detector +#### +import pint.errors + +from .sensors import CB_VL53L1X, TFMini, I2CSensor, SerialSensor +# Import all the CobraBay Exceptions. +from .exceptions import CobraBayException, SensorValueException +from pint import UnitRegistry, Quantity, DimensionalityError +from time import monotonic_ns +from functools import wraps +import logging +from collections import namedtuple + + +# Decorator method to check if a method is ready. +def check_ready(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + print("Checking for readiness via function: {}".format(func)) + # Call the function. + func(self, *args, **kwargs) + # Now check for readiness and set ready if we are. + for setting in self._settings['required']: + if self._settings[setting] is None: + self._ready = False + return + self._ready = True + self._when_ready() + + return wrapper + + +# Use this decorator on any method that shouldn't be usable if the detector isn't marked ready. IE: don't let +# a value be read if the sensor isn't completely set up. +def only_if_ready(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not self.ready: + raise RuntimeError("Detector is not fully configured, cannot read value yet. " + "Current settings:\n{}".format(self._settings)) + else: + return func(self, *args, **kwargs) + + return wrapper + + +# Decorate these methods to have methods check the value history before doing another hit on the sensor. +def use_value_cache(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if len(self._history) > 0: + time_delta = Quantity(monotonic_ns() - self._history[0][1], 'nanoseconds') + # Get the timing of the sensor. + sensor_timing = Quantity(self.measurement_time + " milliseconds") + if time_delta < sensor_timing: # If not enough time has passed, send back the most recent reading. + value = self._history[0][0] + else: + value = self._sensor_obj.range + # Add value to the history and truncate history to ten records. + self._history.insert(0, value) + self._history = self._history[:10] + else: + value = self._sensor_obj.range + # Send whichever value it is into the function. + return func(self, value) + + return wrapper + + +# This decorator is used for methods that rely on sensor data. If the sensor data is determined to be stale, it will +# trigger a new sensor read prior to executing the method. +def read_if_stale(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + read_sensor = False + # Assume we shouldn't read the sensor. + if len(self._history) > 0: + # Time difference between now and the most recent read. + time_delta = monotonic_ns() - self._history[0][1] + if time_delta > 1000000000: # 1s is too long, read the sensor. + read_sensor = True + # If we have no history, ie: at startup, go ahead and read. + else: + read_sensor = True ## If there's no history, read the sensor, we must be on startup. + # If flag is set, read the sensor and put its value into the history. + if read_sensor: + try: + value = self._sensor_obj.range + except BaseException as e: + # Save an exception into the history, we'll process this later. + value = e + self._logger.debug("Triggered sensor reading. Got: {}".format(value)) + # Add value to the history and truncate history to ten records. + self._history.insert(0, (value, monotonic_ns())) + self._history = self._history[:10] + # Call the wrapped function. + return func(self) + + return wrapper + + +class Detector: + def __init__(self, detector_id, name, offset="0 cm", log_level="WARNING", **kwargs): + # Save parameters. + self._detector_id = detector_id + self._name = name + # Create a logger. + self._logger = logging.getLogger("CobraBay").getChild("Detector").getChild(self._name) + self._logger.setLevel(log_level) + # A unit registry + self._ureg = UnitRegistry() + # Is the detector ready for use? + self._ready = False + # Measurement offset. We start this at zero, even though that's probably ridiculous! + self._offset = Quantity(offset) + # List to keep the history of sensor readings. This is used for some methods. + self._history = [] + + # Value will return the adjusted reading from the sensor. + @property + def value(self): + raise NotImplementedError + + # Assess the quality of the sensor reading. + @property + @read_if_stale + def quality(self): + raise NotImplementedError + + # These properties and methods are common to all detectors. + @property + def ready(self): + return self._ready + + # Measurement offset. All detectors will have this, even if it's 0. + @property + def offset(self): + return self._offset + + @offset.setter + @check_ready + def offset(self, input): + self._offset = self._convert_value(input) + + @property + def id(self): + return self._settings['id'] + + @property + def name(self): + return self._settings['name'] + + # Convenience property to let upstream modules check for the detector type. This disconnects + # from the class name, because in the future we may have multiple kinds of 'range' or 'lateral' detectors. + @property + def detector_type(self): + return type(self).__name__.lower() + + # Utility method to convert into quantities. + @staticmethod + def _convert_value(input_value): + # If it's already a quantity, return it right away. + if isinstance(input_value, Quantity): + return input_value + elif isinstance(input_value, str): + return Quantity(input_value) + else: + raise ValueError("Not a parseable value!") + + # This method will get called by the readiness checker once the detector is ready. + # If the detector has specific additional work to do on becoming ready, override this method and put it here. + def _when_ready(self): + pass + + +# Single Detectors add wrappers around a single sensor. Sensor arrays are not currently supported, but may be in the +# future. +class SingleDetector(Detector): + def __init__(self, sensor_type, sensor_settings, **kwargs): + print("Single detector has kwargs: {}".format(kwargs)) + super().__init__(**kwargs) + self._logger.debug("Creating sensor object using options: {}".format(sensor_settings)) + if sensor_type == 'VL53L1X': + # Create the sensor object using provided settings. + self._sensor_obj = CB_VL53L1X(**sensor_settings) + elif sensor_type == 'TFMini': + print("Setting up TFMini with sensor settings: {}".format(sensor_settings)) + self._sensor_obj = TFMini(**sensor_settings) + else: + raise ValueError("Detector {} trying to use unknown sensor type {}".format( + self._name, sensor_settings)) + + # Allow adjustment of timing. + def timing(self, timing_input): + # Make sure the timing input is a Quantity. + timing_input = self._convert_value(timing_input) + # Sensor takes measurement time in microseconds. + mt = timing_input.to('microseconds').magnitude + self._sensor_obj.measurement_time = mt + + # Called when the system needs to turn this sensor on. + def activate(self): + self._sensor_obj.start_ranging() + + # Called when the system needs to go into idle mode and turn the sensor off. + def deactivate(self): + print("Calling sensor object's stop_ranging method.") + self._sensor_obj.stop_ranging() + + # Complete turn the sensor on or off using its enable pin. + def enable(self): + self._sensor_obj.enable() + + def disable(self): + self._logger.warning("Setting sensor enable pin to false.") + self._sensor_obj.disable() + + # Called *only* when the system is exiting. This is currently an alias for deactivating, but other types + # of hardware may need other behavior, ie: freeing devices. + def shutdown(self): + self.deactivate() + self.disable() + + # Debugging methods to let the main system know a few things about the attached sensor. + @property + def sensor_type(self): + return type(self._sensor_obj) + + @property + def status(self): + return self._sensor_obj.status + + @property + def sensor_interface(self): + iface_info = namedtuple("iface_info", ['type', 'addr']) + if isinstance(self._sensor_obj, SerialSensor): + iface = iface_info("serial", self._sensor_obj.serial_port) + return iface + elif isinstance(self._sensor_obj, I2CSensor): + iface = iface_info("i2c", self._sensor_obj.i2c_address) + return iface + return None + + +# Detector that measures range progress. +class Range(SingleDetector): + def __init__(self, error_margin, **kwargs): + print("Range init.\n\tHave kwargs: {}".format(kwargs)) + super().__init__(**kwargs) + self._error_margin = error_margin + + + # Return the adjusted reading of the sensor. + @property + @read_if_stale + def value(self): + self._logger.debug("Creating adjusted value from latest value: {}".format(self._history[0][0])) + if isinstance(self._history[0][0], Quantity): + return self._history[0][0] - self.offset + elif self._history[0][0] is None: + return None + elif isinstance(self._history[0][0], str): + if self._history[0][0] == 'No reading': + return "No reading" + else: + return "Error" + + # Method to get the raw sensor reading. This is used to report upward for HA extended attributes. + @property + @read_if_stale + def value_raw(self): + self._logger.debug("Most recent reading is: {}".format(self._history[0][0])) + if isinstance(self._history[0][0], Quantity): + return self._history[0][0] + elif self._history[0][0] is None: + return "Unknown" + elif isinstance(self._history[0][0], str): + if self._history[0][0] == 'No reading': + return "No reading" + else: + return "Error" + + # Assess the quality of the sensor + @property + @read_if_stale + def quality(self): + self._logger.debug("Creating quality from latest value: {}".format(self._history[0][0])) + self._logger.debug("90% of bay depth is: {}".format(self._settings['bay_depth'] * .9)) + # Is there one of our own exceptions? These we can *probably* handle and get some useful information from. + if isinstance(self._history[0][0], SensorValueException): + # A weak reading from the sensor almost certainly means the door is open and nothing is blocking. + if self._history[0][0].status == "Weak": + return "Door open" + elif self._history[0][0].status in ("Saturation", "Flood"): + # When saturated or flooded, just pass on those statuses. + return self._history[0][0] + else: + return "Unknown" + # All other exceptions. + elif isinstance(self._history[0][0], BaseException): + return "Unknown" + else: + # You're about to hit the wall! + if self._history[0][0] < Quantity("2 in"): + return 'Emergency!' + elif (self._settings['bay_depth'] * 0.90) <= self._history[0][0]: + self._logger.debug( + "Reading is more than 90% of bay depth ({})".format(self._settings['bay_depth'] * .9)) + return 'No object' + # Now consider the adjusted values. + elif self.value < 0 and abs(self.value) > self.spread_park: + return 'Back up' + elif abs(self.value) < self.spread_park: + return 'Park' + elif self.value <= self._settings['dist_crit']: + return 'Final' + elif self.value <= self._settings['dist_warn']: + return 'Base' + else: + return 'OK' + + # Determine the rate of motion being measured by the detector. + @property + @read_if_stale + def _movement(self): + # Filter out non-Quantity sensor responses. We may want to keep them for other reasons, but can't average them. + history = [] + for entry in self._history: + if isinstance(entry[0], Quantity): + history.append(entry) + # If we don't have at least two data points, can't measure, return none. + if len(history) < 2: + return None + elif history[0][0] == 'Weak': + return None + # If the sensor is reading beyond range, speed and direction can't be known, so return immediately. + last_element = len(history) - 1 + self._logger.debug("First history: {}".format(history[0])) + self._logger.debug("Last history: {}".format(history[last_element])) + try: + net_dist = self._history[0][0] - history[last_element][0] + net_time = Quantity(history[0][1] - history[last_element][1], 'nanoseconds').to('seconds') + except pint.errors.DimensionalityError: + # If we're trying to subtract a non-Quantity value, then return unknown for these. + return None + self._logger.debug("Moved {} in {}s".format(net_dist, net_time)) + speed = (net_dist / net_time).to('kph') + # Since we've processed all this already, return all three values. + return {'speed': speed, 'net_dist': net_dist, 'net_time': net_time} + + # Based on readings, is the vehicle in motion? + @property + @read_if_stale + def motion(self): + # Grab the movement + movement = self._movement + if movement is None: + return "Unknown" + elif abs(self._movement['net_dist']) > Quantity(self._error_margin): + return True + else: + return False + + @property + @read_if_stale + def vector(self): + # Grab the movement value. + movement = self._movement + # Determine a direction. + if movement is None: + # If movement couldn't be determined, we also can't determine vector, so this is unknown. + return {"speed": "Unknown", "direction": "Unknown"} + # Okay, not none, so has value! + if movement['net_dist'] > Quantity(self._error_margin): + return {'speed': abs(movement['speed']), 'direction': 'forward'} + elif movement['net_dist'] < (Quantity(self._error_margin) * -1): + return {'speed': abs(movement['speed']), 'direction': 'reverse'} + else: + return {'speed': Quantity("0 kph"), 'direction': 'still'} + + # Gets called when the rangefinder has all settings and is being made ready for use. + def _when_ready(self): + # Calculate specific distances to use based on the percentages. + self._derived_distances() + + # Allow dynamic distance mode changes to come from the bay. This is largely used for debugging. + def distance_mode(self, target_mode): + try: + self._sensor_obj.distance_mode = target_mode + except ValueError: + print("Could not change distance mode to {}".format(target_mode)) + + @property + def bay_depth(self): + return self._settings['bay_depth'] + + @bay_depth.setter + @check_ready + def bay_depth(self, depth): + self._settings['bay_depth'] = self._convert_value(depth) + self._derived_distances() + + @property + def spread_park(self): + return self._settings['spread_park'] + + @spread_park.setter + @check_ready + def spread_park(self, input): + self._settings['spread_park'] = self._convert_value(input) + + # Properties for warning and critical percentages. We take these are "normal" percentages (ie: 15.10) and convert + # to decimal so it can be readily used for multiplication. + @property + def pct_warn(self): + return self._settings['pct_warn'] * 100 + + @pct_warn.setter + @check_ready + def pct_warn(self, input): + self._settings['pct_warn'] = input / 100 + + @property + def pct_crit(self): + return self._settings['pct_crit'] * 100 + + @pct_crit.setter + @check_ready + def pct_crit(self, input): + self._settings['pct_crit'] = input / 100 + + # Pre-bake distances for warn and critical to make evaluations a little easier. + def _derived_distances(self): + self._logger.debug("Calculating derived distances.") + adjusted_distance = self._settings['bay_depth'] - self._offset + self._settings['dist_warn'] = adjusted_distance.magnitude * self.pct_warn * adjusted_distance.units + self._settings['dist_crit'] = adjusted_distance.magnitude * self.pct_crit * adjusted_distance.units + + # Reference some properties upward to the parent class. This is necessary because properties aren't directly + # inherented. + + @property + def i2c_address(self): + return super().i2c_address + + @property + def offset(self): + return super().offset + + @offset.setter + def offset(self, input): + super(Range, self.__class__).offset.fset(self, input) + + +# Detector for lateral position +class Lateral(SingleDetector): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @property + @read_if_stale + def value(self): + self._logger.debug("Most recent reading is: {}".format(self._history[0][0])) + if isinstance(self._history[0][0], Quantity): + return self._history[0][0] - self.offset + else: + return None + + # Method to get the raw sensor reading. This is used to report upward for HA extended attributes. + @property + @read_if_stale + def value_raw(self): + self._logger.debug("Most recent reading is: {}".format(self._history[0][0])) + if isinstance(self._history[0][0], Quantity): + return self._history[0][0] + else: + return None + + @property + @read_if_stale + def quality(self): + self._logger.debug("Assessing quality for value: {}".format(self.value)) + # Process quality if we get a quantity from the Detector. + if isinstance(self.value, Quantity): + self._logger.debug("Comparing to: \n\t{}\n\t{}".format( + self.spread_ok, self.spread_warn)) + if self.value > Quantity('96 in'): + # A standard vehicle width (in the US, at least) is 96 inches, so if we're reading something further + # than that, it's not the vehicle in question (ie: a far wall, another vehicle, etc). + qv = "No object" + elif abs(self.value) <= self.spread_ok: + qv = "OK" + elif abs(self.value) <= self.spread_warn: + qv = "Warning" + elif abs(self.value) > self.spread_warn: + qv = "Critical" + else: + # Total failure to return a value means the light didn't reflect off anything. That *probably* means + # nothing is there, but it could be failing for other reasons. + qv = "Unknown" + else: + qv = "Unknown" + self._logger.debug("Quality returning {}".format(qv)) + return qv + + @property + def ready(self): + return self._ready + + @property + def spread_ok(self): + return self._settings['spread_ok'] + + @spread_ok.setter + @check_ready + def spread_ok(self, m_input): + self._settings['spread_ok'] = self._convert_value(m_input).to('cm') + # Check to see if the detector is now ready. + + @property + def spread_warn(self): + return self._settings['spread_warn'] + + @spread_warn.setter + @check_ready + def spread_warn(self, m_input): + self._settings['spread_warn'] = self._convert_value(m_input).to('cm') + + @property + def side(self): + return self._settings['side'] + + @side.setter + @check_ready + def side(self, m_input): + if m_input.upper() not in ('R', 'L'): + raise ValueError("Lateral side must be 'R' or 'L'") + else: + self._settings['side'] = m_input.upper() + + # Reference some properties upward to the parent class. This is necessary because properties aren't directly + # inherented. + + @property + def i2c_address(self): + return super().i2c_address + + @property + def offset(self): + return super().offset + + @offset.setter + @check_ready + def offset(self, m_input): + super(Lateral, self.__class__).offset.fset(self, m_input) diff --git a/lib/display.py b/CobraBay/display.py similarity index 99% rename from lib/display.py rename to CobraBay/display.py index 21c09f6..c7bf149 100644 --- a/lib/display.py +++ b/CobraBay/display.py @@ -17,7 +17,7 @@ ureg = UnitRegistry() -class Display: +class CBDisplay: def __init__(self, config): # Get a logger! self._logger = logging.getLogger("CobraBay").getChild("Display") @@ -109,7 +109,7 @@ def register_bay(self, display_reg_info): # Put this in the right place in the lookup. self._layers[bay_id][lateral][side][status[0]] = img # Write for debugging - # img.save("/tmp/cobrabay-{}-{}-{}.png".format(lateral,side,status[0]), format='PNG') + # img.save("/tmp/CobraBay-{}-{}-{}.png".format(lateral,side,status[0]), format='PNG') del(draw) del(img) # Now add the height of this bar to the accumulated height, to get the correct start for the next time. @@ -385,7 +385,7 @@ def _output_image(self,image): self.current = b64encode(image_buffer.getvalue()) # For debugging, write to a file in tmp. - # image.save("/tmp/cobrabay-display.png", format='PNG') + # image.save("/tmp/CobraBay-display.png", format='PNG') @property def current(self): diff --git a/CobraBay/exceptions.py b/CobraBay/exceptions.py new file mode 100644 index 0000000..717f0b9 --- /dev/null +++ b/CobraBay/exceptions.py @@ -0,0 +1,14 @@ +#### +# Cobra Bay - Exceptions +# +# Custom Exceptions for CobraBay to raise +#### + +class CobraBayException(Exception): + """CobraBay Exceptions""" + +class SensorValueException(CobraBayException): + """Raised when a sensor reads a value that indicates a non-range state.""" + def __init__(self, *args, **kwargs): + super().__init__(*args) + self.status = kwargs.get('status') \ No newline at end of file diff --git a/lib/nan.py b/CobraBay/nan.py similarity index 100% rename from lib/nan.py rename to CobraBay/nan.py diff --git a/lib/network.py b/CobraBay/network.py similarity index 94% rename from lib/network.py rename to CobraBay/network.py index e2a6514..0c23536 100644 --- a/lib/network.py +++ b/CobraBay/network.py @@ -17,7 +17,7 @@ from .version import __version__ -class Network: +class CBNetwork: def __init__(self, config): # Get our settings. self._reconnect_timestamp = None @@ -34,13 +34,6 @@ def __init__(self, config): if self._logger_mqtt.level != self._logger.level: self._logger.info("MQTT Logging level set to {}".format(self._logger_mqtt.level)) - try: - from secrets import secrets - self.secrets = secrets - except ImportError: - self._logger.error('No secrets file, cannot get connection details.') - raise - # Create a convertomatic instance. self._cv = Convertomatic(self._settings['units']) @@ -71,21 +64,21 @@ def __init__(self, config): client_id="" ) self._mqtt_client.username_pw_set( - username=self.secrets['username'], - password=self.secrets['password'] + username=self._settings['mqtt']['username'], + password=self._settings['mqtt']['password'] ) # Send MQTT logging to the network logger. # self._mqtt_client.enable_logger(self._logger) # MQTT host to connect to. - self._mqtt_host = self.secrets['broker'] + self._mqtt_host = self._settings['mqtt']['broker'] # If port is set, us that. try: - self._mqtt_port = self.secrets['port'] + self._mqtt_port = self._settings['mqtt']['port'] except: self._mqtt_port = 1883 - # Set TLS options. - if 'tls' in self.secrets: + # Set TLS options. Not currently supported, this is a stud. + if 'tls' in self._settings['mqtt']: pass # Connect callback. @@ -97,7 +90,7 @@ def __init__(self, config): self._topics = { 'system': { 'device_connectivity': { - 'topic': 'cobrabay/' + self._client_id + '/connectivity', + 'topic': 'CobraBay/' + self._client_id + '/connectivity', 'previous_state': {}, 'enabled': True, 'ha_discovery': { @@ -110,7 +103,7 @@ def __init__(self, config): } }, 'cpu_pct': { - 'topic': 'cobrabay/' + self._client_id + '/cpu_pct', + 'topic': 'CobraBay/' + self._client_id + '/cpu_pct', 'previous_state': {}, 'enabled': True, 'ha_discovery': { @@ -122,7 +115,7 @@ def __init__(self, config): } }, 'cpu_temp': { - 'topic': 'cobrabay/' + self._client_id + '/cpu_temp', + 'topic': 'CobraBay/' + self._client_id + '/cpu_temp', 'previous_state': {}, 'enabled': True, 'ha_discovery': { @@ -134,7 +127,7 @@ def __init__(self, config): } }, 'mem_info': { - 'topic': 'cobrabay/' + self._client_id + '/mem_info', + 'topic': 'CobraBay/' + self._client_id + '/mem_info', 'previous_state': {}, 'enabled': True, 'ha_discovery': { @@ -144,11 +137,11 @@ def __init__(self, config): 'value_template': "{{{{ value_json.mem_pct }}}}", 'unit_of_measurement': '%', 'icon': 'mdi:memory', - 'json_attributes_topic': 'cobrabay/' + self._client_id + '/mem_info' + 'json_attributes_topic': 'CobraBay/' + self._client_id + '/mem_info' } }, 'undervoltage': { - 'topic': 'cobrabay/' + self._client_id + '/undervoltage', + 'topic': 'CobraBay/' + self._client_id + '/undervoltage', 'previous_state': {}, 'ha_discovery': { 'name': '{} Undervoltage'.format(self._settings['system_name']), @@ -160,13 +153,13 @@ def __init__(self, config): } }, # 'device_command': { - # 'topic': 'cobrabay/' + self._client_id + '/cmd', + # 'topic': 'CobraBay/' + self._client_id + '/cmd', # 'enabled': False, # 'callback': self._cb_device_command # # May eventually do discovery here to create selectors, but not yet. # }, 'display': { - 'topic': 'cobrabay/' + self._client_id + '/display', + 'topic': 'CobraBay/' + self._client_id + '/display', 'previous_state': {}, 'ha_discovery': { 'name': '{} Display'.format(self._settings['system_name']), @@ -179,7 +172,7 @@ def __init__(self, config): }, 'bay': { 'bay_occupied': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/occupancy', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/occupancy', 'previous_state': 'Unknown', 'ha_discovery': { 'name': '{0[bay_name]} Occupied', @@ -192,7 +185,7 @@ def __init__(self, config): } }, 'bay_state': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/state', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/state', 'previous_state': None, 'ha_discovery': { 'name': '{0[bay_name]} State', @@ -201,7 +194,7 @@ def __init__(self, config): } }, 'bay_laterals': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/{0[lateral]/display', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/{0[lateral]/display', 'previous_state': {}, 'ha_discovery': { 'name': '{0[bay_name]} {0[lateral]} Display', @@ -212,7 +205,7 @@ def __init__(self, config): }, # Adjusted readings from the sensors. # 'bay_position': { - # 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/position', + # 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/position', # 'previous_state': None, # 'ha_discovery': { # 'name': '{0[bay_name]} Detector Position: {0[detector_name]}', @@ -224,7 +217,7 @@ def __init__(self, config): # } # }, 'bay_position': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/{0[detector_id]}', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/{0[detector_id]}', 'previous_state': None, 'ha_discovery': { 'name': '{0[bay_name]} Detector Position: {0[detector_name]}', @@ -233,12 +226,12 @@ def __init__(self, config): 'value_template': '{{{{ value_json.adjusted_reading }}}}', 'unit_of_measurement': self._uom('length'), 'icon': 'mdi:ruler', - 'json_attributes_topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/{0[detector_id]}' + 'json_attributes_topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/{0[detector_id]}' } }, # How good the parking job is. 'bay_quality': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/quality', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/quality', 'previous_state': None, 'enabled': True, 'ha_discovery': { @@ -250,7 +243,7 @@ def __init__(self, config): } }, 'bay_speed': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/vector', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/vector', 'previous_state': None, 'ha_discovery': { 'name': '{0[bay_name]} Speed', @@ -263,7 +256,7 @@ def __init__(self, config): }, # Motion binary sensor. Keys off the 'direction' value in the Vector topic. 'bay_motion': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/vector', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/vector', 'previous_state': 'Unknown', 'enabled': True, 'ha_discovery': { @@ -275,7 +268,7 @@ def __init__(self, config): } }, 'bay_dock_time': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/dock_time', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/dock_time', 'previous_state': None, 'ha_discovery': { 'name': '{0[bay_name]} Time Until Docked', @@ -287,7 +280,7 @@ def __init__(self, config): }, # Set up the command selector 'bay_command': { - 'topic': 'cobrabay/' + self._client_id + '/{0[bay_id]}/cmd', + 'topic': 'CobraBay/' + self._client_id + '/{0[bay_id]}/cmd', 'ha_discovery': { 'name': '{0[bay_name]} Command', 'type': 'select', @@ -323,7 +316,7 @@ def register_trigger(self, trigger_obj): self._trigger_registry[trigger_obj.id] = trigger_obj self._logger.debug("Stored trigger object {}".format(trigger_obj.id)) # Add the MQTT Prefix to use to the object. Triggers set to override this will just ignore it. - trigger_obj.topic_prefix = "cobrabay/" + self._client_id + trigger_obj.topic_prefix = "CobraBay/" + self._client_id # Since it's possible we're already connected to MQTT, we call subscribe here separately. self._trigger_subscribe(trigger_obj.id) @@ -684,7 +677,7 @@ def _ha_create(self, topic_type=None, topic_name=None, fields=None): config_dict['payload_not_available'] = self._topics['system']['device_connectivity']['ha_discovery']['payload_off'] # Configuration topic to which we'll send this configuration. This is based on the entity type and entity ID. - config_topic = "homeassistant/{}/cobrabay-{}/{}/config".\ + config_topic = "homeassistant/{}/CobraBay-{}/{}/config".\ format(config_dict['type'], self._client_id, config_dict['object_id']) diff --git a/lib/sensors.py b/CobraBay/sensors.py similarity index 85% rename from lib/sensors.py rename to CobraBay/sensors.py index 90c4ec4..206ba1e 100644 --- a/lib/sensors.py +++ b/CobraBay/sensors.py @@ -15,14 +15,11 @@ from pathlib import Path from pprint import pformat import sys +import CobraBay.exceptions class BaseSensor: - def __init__(self, sensor_options, required): - self._settings = {} - for item in required: - if item not in sensor_options: - raise ValueError("Required sensor_option '{}' missing.".format(item)) + def __init__(self, logger, log_level='WARNING'): # Create a unit registry for the object. self._ureg = UnitRegistry() @@ -31,33 +28,34 @@ def __init__(self, sensor_options, required): self._previous_timestamp = monotonic() self._previous_reading = None - # Sensor should call this init and then extend with its own options. - # super().__init__(board_options) - @property def range(self): raise NotImplementedError("Range should be overridden by specific sensor class.") + # Base methods, should be overridden. + def start_ranging(self): + return + + def stop_ranging(self): + return class I2CSensor(BaseSensor): - def __init__(self, sensor_options): - required = ('i2c_bus', 'i2c_address') + def __init__(self, i2c_bus, i2c_address, logger, log_level='WARNING'): try: - super().__init__(sensor_options, required) + super().__init__(logger, log_level) except ValueError: raise # Check for the Base I2C Sensors # Create a logger - self._name = "{}-{}-{}".format(type(self).__name__, sensor_options['i2c_bus'], - hex(sensor_options['i2c_address'])) + self._name = "{}-{}-{}".format(type(self).__name__, i2c_bus, hex(i2c_address)) self._logger = logging.getLogger("CobraBay").getChild("Sensors").getChild(self._name) self._logger.setLevel("DEBUG") self._logger.info("Initializing sensor...") # Set the I2C bus and I2C Address self._logger.debug("Setting I2C Properties...") - self.i2c_bus = sensor_options['i2c_bus'] - self.i2c_address = sensor_options['i2c_address'] + self.i2c_bus = i2c_bus + self.i2c_address = i2c_address self._logger.debug("Now have I2C Bus {} and Address {}".format(self.i2c_bus, hex(self.i2c_address))) # How many times, in the lifetime of the sensor, have we hit a fault. @@ -65,7 +63,13 @@ def __init__(self, sensor_options): # Set if the sensor hit a fault, recovered, but hasn't yet succeeded in re-reading. If it faults *again*, bomb. self._last_chance = False - # Global properties. Since all supported sensors are I2C at the moment, these can be global. + # Global properties. + + @property + def name(self): + """ Sensor Name, derived from type, bus, address. """ + return self._name + @property def i2c_bus(self): return self._i2c_bus @@ -82,7 +86,7 @@ def i2c_address(self): return self._i2c_address @i2c_address.setter - # Stores the address of the board. Does *not* necesarially apply it to the board to update it. + # Stores the address of the board. Does *not* necessarily apply it to the board to update it. def i2c_address(self, i2c_address): # If it's in "0xYY" format, convert it to a base 16 int. if isinstance(i2c_address, str): @@ -92,23 +96,28 @@ def i2c_address(self, i2c_address): class SerialSensor(BaseSensor): - def __init__(self, sensor_options): - required = ('port', 'baud') + def __init__(self, port, baud, logger, log_level='WARNING'): + """ + :type baud: int + :type logger: str + """ try: - super().__init__(sensor_options, required) + super().__init__(logger, log_level) except ValueError: raise # Create a logger - self._name = "{}-{}".format(type(self).__name__, sensor_options['port']) + self._name = "{}-{}".format(type(self).__name__, port) self._logger = logging.getLogger("CobraBay").getChild("Sensors").getChild(self._name) self._logger.info("Initializing sensor...") self._logger.setLevel("WARNING") - self.serial_port = sensor_options['port'] - self.baud_rate = sensor_options['baud'] + self._serial_port = None + self._baud_rate = None + self.serial_port = port + self.baud_rate = baud @property def serial_port(self): - return self._settings['serial_port'] + return self._serial_port @serial_port.setter def serial_port(self, target_port): @@ -118,18 +127,22 @@ def serial_port(self, target_port): port_path = Path("/dev/" + target_port) # Make sure the path is a device we can access. if port_path.is_char_device(): - self._settings['serial_port'] = str(port_path) + self._serial_port = str(port_path) else: raise ValueError("{} is not an accessible character device.".format(str(port_path))) @property def baud_rate(self): - return self._settings['baud_rate'] + return self._baud_rate @baud_rate.setter def baud_rate(self, target_rate): - self._settings['baud_rate'] = target_rate + self._baud_rate = target_rate + @property + def name(self): + """ Sensor name, type-port """ + return self._name class CB_VL53L1X(I2CSensor): _i2c_address: int @@ -137,16 +150,22 @@ class CB_VL53L1X(I2CSensor): instances = weakref.WeakSet() - def __init__(self, sensor_options): - # Call super. - super().__init__(sensor_options) - # Check for additional required options. + def __init__(self, i2c_bus, i2c_address, enable_board, enable_pin, timing, logger, distance_mode ="long", log_level="WARNING"): + """ + :type i2c_bus: int + :type i2c_address: hex + :type enable_board: str + :type enable_pin: str + :type logger: str + :type log_level: str + """ + + try: + super().__init__(i2c_bus=i2c_bus, i2c_address=i2c_address, logger=logger, log_level=log_level) + except ValueError: + raise + self._sensor_obj = None - required = ['i2c_bus', 'i2c_address', 'enable_board', 'enable_pin'] - # Store the options. - for item in required: - if item not in sensor_options: - raise ValueError("Required board_option '{}' missing".format(item)) # The library doesn't store its ranging state itself, so we have to do this ourselves. self._ranging = False @@ -175,17 +194,17 @@ def __init__(self, sensor_options): raise # Set the properties - self.enable_board = sensor_options['enable_board'] - self.enable_pin = sensor_options['enable_pin'] - self.i2c_bus = sensor_options['i2c_bus'] - self.i2c_address = sensor_options['i2c_address'] + self.enable_board = enable_board + self.enable_pin = enable_pin + # self.i2c_bus = i2c_bus + # self.i2c_address = i2c_address # Enable self. self.enable() # Start ranging. self._sensor_obj.start_ranging() # Set the timing. - self.measurement_time = Quantity(sensor_options['timing']).to('microseconds').magnitude + self.measurement_time = Quantity(timing).to('microseconds').magnitude self.distance_mode = 'long' self._previous_reading = self._sensor_obj.distance self._logger.debug("Test reading: {}".format(self._previous_reading)) @@ -412,9 +431,14 @@ def status(self): else: return 'fault' + class TFMini(SerialSensor): - def __init__(self, sensor_options): - super().__init__(sensor_options) + def __init__(self, port, baud, logger, log_level="WARNING"): + try: + super().__init__(port=port, baud=baud, logger=logger, log_level=log_level) + except ValueError: + raise + self._performance = { 'max_range': Quantity('12m'), 'min_range': Quantity('0.3m') @@ -423,7 +447,10 @@ def __init__(self, sensor_options): # Create the sensor object. self._logger.debug("Creating TFMini object on serial port {}".format(self.serial_port)) self._sensor_obj = TFMP(self.serial_port, self.baud_rate) - self._logger.debug("Test reading: {}".format(self.range)) + try: + self._logger.debug("Test reading: {}".format(self.range)) + except CobraBay.exceptions.SensorValueException as e: + self._logger.warning("During sensor setup, received abnormal reading '{}'.".format(e.status)) # This sensor doesn't need an enable, do nothing. @staticmethod @@ -446,7 +473,7 @@ def range(self): self._previous_timestamp = monotonic() return self._previous_reading else: - return reading.status + raise CobraBay.exceptions.SensorValueException(status=reading.status) # When this was tested in I2C mode, the TFMini could return unstable answers, even at rest. Unsure if # this is still true in serial mode, keeping this clustering method for the moment. diff --git a/lib/synthsensor.py b/CobraBay/synthsensor.py similarity index 100% rename from lib/synthsensor.py rename to CobraBay/synthsensor.py diff --git a/lib/systemhw.py b/CobraBay/systemhw.py similarity index 98% rename from lib/systemhw.py rename to CobraBay/systemhw.py index 21ec3b9..f758de0 100644 --- a/lib/systemhw.py +++ b/CobraBay/systemhw.py @@ -11,7 +11,7 @@ from pint import UnitRegistry from rpi_bad_power import new_under_voltage -class PiStatus: +class CBPiStatus: def __init__(self): self._ureg = UnitRegistry() self._ureg.define('percent = 1 / 100 = %') diff --git a/lib/triggers.py b/CobraBay/triggers.py similarity index 100% rename from lib/triggers.py rename to CobraBay/triggers.py diff --git a/lib/util.py b/CobraBay/util.py similarity index 99% rename from lib/util.py rename to CobraBay/util.py index 0873888..17a5b09 100644 --- a/lib/util.py +++ b/CobraBay/util.py @@ -6,6 +6,7 @@ import board import busio from time import sleep +import CobraBay # General purpose converter. @@ -103,6 +104,7 @@ def mqtt_message_search(input, element, value, extract=None): return_values.append(matched_message[extract]) return return_values + def scan_i2c(): i2c = busio.I2C(board.SCL, board.SDA) while not i2c.try_lock(): diff --git a/lib/version.py b/CobraBay/version.py similarity index 100% rename from lib/version.py rename to CobraBay/version.py diff --git a/cobrabay.py b/cobrabay.py deleted file mode 100644 index 7f1a4f4..0000000 --- a/cobrabay.py +++ /dev/null @@ -1,15 +0,0 @@ -#### -# CobraBay - Command line invoker -#### - - -import lib.cobrabay as cobrabay -from pid import PidFile - - -# Initialize the object. -cb = cobrabay.CobraBay() - -# Start the main operating loop. -#with PidFile('CobraBay'): -cb.run() \ No newline at end of file diff --git a/hardware/hardware.md b/docs/hardware.md similarity index 100% rename from hardware/hardware.md rename to docs/hardware.md diff --git a/hardware/printf.h b/docs/printf.h similarity index 95% rename from hardware/printf.h rename to docs/printf.h index e8bde5c..212ec3d 100644 --- a/hardware/printf.h +++ b/docs/printf.h @@ -1,58 +1,58 @@ -/* - Copyright (C) 2011 J. Coliz - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - version 2 as published by the Free Software Foundation. - */ - /* Galileo support from spaniakos */ - -/** - * @file printf.h - * - * Setup necessary to direct stdout to the Arduino Serial library, which - * enables 'printf' - */ - -#ifndef __PRINTF_H__ -#define __PRINTF_H__ - -#if defined (ARDUINO) && !defined (__arm__) && !defined(__ARDUINO_X86__) - -int serial_putc( char c, FILE * ) -{ - Serial.write( c ); - - return c; -} - -void printf_begin(void) -{ - fdevopen( &serial_putc, 0 ); -} - -#elif defined (__arm__) - -void printf_begin(void){} - -#elif defined(__ARDUINO_X86__) -int serial_putc( char c, FILE * ) -{ - Serial.write( c ); - - return c; -} - -void printf_begin(void) -{ - //For reddirect stdout to /dev/ttyGS0 (Serial Monitor port) - stdout = freopen("/dev/ttyGS0","w",stdout); - delay(500); - printf("redirecting to Serial..."); - - // ----------------------------------------------------------- -} -#else -#error This example is only for use on Arduino. -#endif // ARDUINO - -#endif // __PRINTF_H__ +/* + Copyright (C) 2011 J. Coliz + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + version 2 as published by the Free Software Foundation. + */ + /* Galileo support from spaniakos */ + +/** + * @file printf.h + * + * Setup necessary to direct stdout to the Arduino Serial library, which + * enables 'printf' + */ + +#ifndef __PRINTF_H__ +#define __PRINTF_H__ + +#if defined (ARDUINO) && !defined (__arm__) && !defined(__ARDUINO_X86__) + +int serial_putc( char c, FILE * ) +{ + Serial.write( c ); + + return c; +} + +void printf_begin(void) +{ + fdevopen( &serial_putc, 0 ); +} + +#elif defined (__arm__) + +void printf_begin(void){} + +#elif defined(__ARDUINO_X86__) +int serial_putc( char c, FILE * ) +{ + Serial.write( c ); + + return c; +} + +void printf_begin(void) +{ + //For reddirect stdout to /dev/ttyGS0 (Serial Monitor port) + stdout = freopen("/dev/ttyGS0","w",stdout); + delay(500); + printf("redirecting to Serial..."); + + // ----------------------------------------------------------- +} +#else +#error This example is only for use on Arduino. +#endif // ARDUINO + +#endif // __PRINTF_H__ diff --git a/hardware/reset_to_serial.ino b/docs/reset_to_serial.ino similarity index 96% rename from hardware/reset_to_serial.ino rename to docs/reset_to_serial.ino index 24202f8..45b0801 100644 --- a/hardware/reset_to_serial.ino +++ b/docs/reset_to_serial.ino @@ -1,170 +1,170 @@ -/* File Name: reset_tfmini_to_uart - * Developer: Christopher Gill - Bud R - * Inception: 05 Nov 2022 - * Description: Runs an I2C search to find a connected TFMini-S. Resets it to Serial mode. - * Credits: Bud Ryerson, who wrote the original TFMini Arudino library, and whose I2C - * address changing example script is the backbone of thise tool. - */ - -#include // Arduino standard I2C/Two-Wire Library -#include "printf.h" // Modified to support Intel based Arduino - // devices such as the Galileo. Download from: - // https://github.com/spaniakos/AES/blob/master/printf.h - -#include // Include TFMini Plus Library -TFMPlus tfmP; // Create a TFMini-Plus Serial object -#include // Include the TFMini Plus I2C Library -TFMPI2C tfmPI2C; - -#include -SoftwareSerial mySerial(10,11); // Create a Serial interface. - -// Declare variables -int I2C_total, I2C_error; -uint8_t oldAddr, newAddr; -bool serial_started = false; - -bool scanAddr() -{ - Serial.println(); - Serial.println( "Show all I2C addresses in Decimal and Hex."); - Serial.println( "Scanning..."); - I2C_total = 0; - I2C_error = 0; - oldAddr = 0x10; // default address - for( uint8_t x = 1; x < 127; x++ ) - { - Wire.beginTransmission( x); - // Use return value of Write.endTransmisstion() to - // see if a device did acknowledge the I2C address. - I2C_error = Wire.endTransmission(); - - if( I2C_error == 0) - { - Serial.print( "I2C device found at address "); - printAddress( x); - ++I2C_total; // Increment for each address returned. - if( I2C_total == 1) oldAddr = x; - } - else if( I2C_error == 4) - { - Serial.print( "Unknown I2C error at address "); - Serial.println( x); - } - } - // Display results and return boolean value. - if( I2C_total == 0) - { - Serial.println( "No I2C devices found."); - return false; - } - else return true; -} - -// Print address in decimal and HEX -void printAddress( uint8_t adr) -{ - Serial.print( adr); - Serial.print( " (0x"); - Serial.print( adr < 16 ? "0" : ""); - Serial.print( adr, HEX); - Serial.println( " Hex)"); -} - -void setup() -{ - Wire.begin(); // Initialize two-wire interface - Serial.begin(115200); // Initialize terminal serial port - printf_begin(); // Initialize printf library. - delay(20); - serial_started = false; - - Serial.flush(); // Flush serial write buffer - while( Serial.available())Serial.read(); // flush serial read buffer -} - -void tfserial_setup() -{ - mySerial.begin(115200); - delay(20); - tfmP.begin(&mySerial); - printf( "Firmware version: "); - if( tfmP.sendCommand( GET_FIRMWARE_VERSION, 0)) - { - printf( "%1u.", tfmP.version[ 0]); // print three single numbers - printf( "%1u.", tfmP.version[ 1]); // each separated by a dot - printf( "%1u\r\n", tfmP.version[ 2]); - } - else tfmP.printReply(); - // - - Set the data frame-rate to 20Hz - - - - - - - - - printf( "Data-Frame rate: "); - if( tfmP.sendCommand( SET_FRAME_RATE, FRAME_20)) - { - printf( "%2uHz.\r\n", FRAME_20); - } - else tfmP.printReply(); -} - -// = = = = = = = = = = MAIN LOOP = = = = = = = = = = -void loop() -{ - // Scan for I2C addresses, first one found must be the sensor. - if( scanAddr() ) - { - Serial.println(); - Serial.print( "I2C address found: "); - printAddress( oldAddr); - Serial.println( "Setting sensor to serial mode."); - if ( tfmPI2C.sendCommand(SET_SERIAL_MODE, 0, oldAddr)) - { - Serial.println("Mode change successful"); - if ( tfmPI2C.sendCommand(SAVE_SETTINGS, 0, oldAddr)) - { - Serial.println("Saved settings."); - } - else - { - Serial.println("Could not save settings."); - } - } - else - { - Serial.println("Mode reset failed."); - } - } - else - { - Serial.println("No sensor found on I2C. Trying serial."); - if ( serial_started ) - { - int16_t tfDist = 0; // Distance to object in centimeters - int16_t tfFlux = 0; // Strength or quality of return signal - int16_t tfTemp = 0; // Internal temperature of Lidar sensor chip - - if( tfmP.getData( tfDist, tfFlux, tfTemp)) // Get data from the device. - { - printf( "Dist:%04icm ", tfDist); // display distance, - printf( "Flux:%05i ", tfFlux); // display signal strength/quality, - printf( "Temp:%2i%s", tfTemp, "C"); // display temperature, - printf( "\r\n"); // end-of-line. - } - else // If the command fails... - { - tfmP.printFrame(); // display the error and HEX dataa - } - } - else - { - Serial.println("Doing TF sensor serial setup..."); - tfserial_setup(); - serial_started = true; - } - } - - Serial.println(); - Serial.println( "Program will restart in 30 seconds."); - Serial.println( "*****************************"); - delay( 30000); // And wait for 30 seconds -} -// = = = = = = = = = End of Main Loop = = = = = = = = = +/* File Name: reset_tfmini_to_uart + * Developer: Christopher Gill + Bud R + * Inception: 05 Nov 2022 + * Description: Runs an I2C search to find a connected TFMini-S. Resets it to Serial mode. + * Credits: Bud Ryerson, who wrote the original TFMini Arudino library, and whose I2C + * address changing example script is the backbone of thise tool. + */ + +#include // Arduino standard I2C/Two-Wire Library +#include "printf.h" // Modified to support Intel based Arduino + // devices such as the Galileo. Download from: + // https://github.com/spaniakos/AES/blob/master/printf.h + +#include // Include TFMini Plus Library +TFMPlus tfmP; // Create a TFMini-Plus Serial object +#include // Include the TFMini Plus I2C Library +TFMPI2C tfmPI2C; + +#include +SoftwareSerial mySerial(10,11); // Create a Serial interface. + +// Declare variables +int I2C_total, I2C_error; +uint8_t oldAddr, newAddr; +bool serial_started = false; + +bool scanAddr() +{ + Serial.println(); + Serial.println( "Show all I2C addresses in Decimal and Hex."); + Serial.println( "Scanning..."); + I2C_total = 0; + I2C_error = 0; + oldAddr = 0x10; // default address + for( uint8_t x = 1; x < 127; x++ ) + { + Wire.beginTransmission( x); + // Use return value of Write.endTransmisstion() to + // see if a device did acknowledge the I2C address. + I2C_error = Wire.endTransmission(); + + if( I2C_error == 0) + { + Serial.print( "I2C device found at address "); + printAddress( x); + ++I2C_total; // Increment for each address returned. + if( I2C_total == 1) oldAddr = x; + } + else if( I2C_error == 4) + { + Serial.print( "Unknown I2C error at address "); + Serial.println( x); + } + } + // Display results and return boolean value. + if( I2C_total == 0) + { + Serial.println( "No I2C devices found."); + return false; + } + else return true; +} + +// Print address in decimal and HEX +void printAddress( uint8_t adr) +{ + Serial.print( adr); + Serial.print( " (0x"); + Serial.print( adr < 16 ? "0" : ""); + Serial.print( adr, HEX); + Serial.println( " Hex)"); +} + +void setup() +{ + Wire.begin(); // Initialize two-wire interface + Serial.begin(115200); // Initialize terminal serial port + printf_begin(); // Initialize printf library. + delay(20); + serial_started = false; + + Serial.flush(); // Flush serial write buffer + while( Serial.available())Serial.read(); // flush serial read buffer +} + +void tfserial_setup() +{ + mySerial.begin(115200); + delay(20); + tfmP.begin(&mySerial); + printf( "Firmware version: "); + if( tfmP.sendCommand( GET_FIRMWARE_VERSION, 0)) + { + printf( "%1u.", tfmP.version[ 0]); // print three single numbers + printf( "%1u.", tfmP.version[ 1]); // each separated by a dot + printf( "%1u\r\n", tfmP.version[ 2]); + } + else tfmP.printReply(); + // - - Set the data frame-rate to 20Hz - - - - - - - - + printf( "Data-Frame rate: "); + if( tfmP.sendCommand( SET_FRAME_RATE, FRAME_20)) + { + printf( "%2uHz.\r\n", FRAME_20); + } + else tfmP.printReply(); +} + +// = = = = = = = = = = MAIN LOOP = = = = = = = = = = +void loop() +{ + // Scan for I2C addresses, first one found must be the sensor. + if( scanAddr() ) + { + Serial.println(); + Serial.print( "I2C address found: "); + printAddress( oldAddr); + Serial.println( "Setting sensor to serial mode."); + if ( tfmPI2C.sendCommand(SET_SERIAL_MODE, 0, oldAddr)) + { + Serial.println("Mode change successful"); + if ( tfmPI2C.sendCommand(SAVE_SETTINGS, 0, oldAddr)) + { + Serial.println("Saved settings."); + } + else + { + Serial.println("Could not save settings."); + } + } + else + { + Serial.println("Mode reset failed."); + } + } + else + { + Serial.println("No sensor found on I2C. Trying serial."); + if ( serial_started ) + { + int16_t tfDist = 0; // Distance to object in centimeters + int16_t tfFlux = 0; // Strength or quality of return signal + int16_t tfTemp = 0; // Internal temperature of Lidar sensor chip + + if( tfmP.getData( tfDist, tfFlux, tfTemp)) // Get data from the device. + { + printf( "Dist:%04icm ", tfDist); // display distance, + printf( "Flux:%05i ", tfFlux); // display signal strength/quality, + printf( "Temp:%2i%s", tfTemp, "C"); // display temperature, + printf( "\r\n"); // end-of-line. + } + else // If the command fails... + { + tfmP.printFrame(); // display the error and HEX dataa + } + } + else + { + Serial.println("Doing TF sensor serial setup..."); + tfserial_setup(); + serial_started = true; + } + } + + Serial.println(); + Serial.println( "Program will restart in 30 seconds."); + Serial.println( "*****************************"); + delay( 30000); // And wait for 30 seconds +} +// = = = = = = = = = End of Main Loop = = = = = = = = = diff --git a/lib/__init__.py b/lib/__init__.py deleted file mode 100644 index c79a929..0000000 --- a/lib/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -#### -# Cobra Bay init -#### - -from .cobrabay import CobraBay -from .version import __version__ - -# def read_version(): -# print(__file__) -# """Read a text file and return the content as a string.""" -# with io.open("/lib/cobrabay/version.py") as f: -# return f.read() - -__repo__ = "https://github.com/chrisgilldc/cobraba.git" - -__all__ = [ - "CobraBay", - "Unit"] diff --git a/lib/detector.py b/lib/detector.py index 9374238..dfbc5b3 100644 --- a/lib/detector.py +++ b/lib/detector.py @@ -112,7 +112,7 @@ def __init__(self, config_obj, detector_id): # Is the detector ready for use? self._ready = False # Measurement offset. We start this at zero, even though that's probably ridiculous! - self._settings['offset'] = Quantity("0 cm") + self._offset = Quantity("0 cm") # List to keep the history of sensor readings. This is used for some methods. self._history = [] @@ -135,12 +135,12 @@ def ready(self): # Measurement offset. All detectors will have this, even if it's 0. @property def offset(self): - return self._settings['offset'] + return self._offset @offset.setter @check_ready def offset(self, input): - self._settings['offset'] = self._convert_value(input) + self._offset = self._convert_value(input) @property def id(self): @@ -419,7 +419,7 @@ def pct_crit(self, input): # Pre-bake distances for warn and critical to make evaluations a little easier. def _derived_distances(self): - adjusted_distance = self._settings['bay_depth'] - self._settings['offset'] + adjusted_distance = self._settings['bay_depth'] - self._offset self._settings['dist_warn'] = adjusted_distance.magnitude * self.pct_warn * adjusted_distance.units self._settings['dist_crit'] = adjusted_distance.magnitude * self.pct_crit * adjusted_distance.units diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..93ed7f5 --- /dev/null +++ b/setup.py @@ -0,0 +1,17 @@ +from setuptools import setup, find_packages + +setup( + name="CobraBay", + version="0.1.0", + packages=find_packages(), + package_data={ + "": ["*.ttf"] + }, + include_package_data=True, + entry_points={ + "console_scripts": [ + "CobraBay = CobraBay.cli.main:main", + "cbsensortest = CobraBay.cli.sensor_test:main" + ] + } +) \ No newline at end of file diff --git a/tools/sensor_test.py b/tools/sensor_test.py index 7a65340..171ec81 100644 --- a/tools/sensor_test.py +++ b/tools/sensor_test.py @@ -1,40 +1,21 @@ -import board -import busio -from adafruit_vl53l1x import VL53L1X -import time -from pint import Quantity -from statistics import mean - -test_time = "5 minutes" -sensor_address = 0x30 -timing_budget = 200 - -#### -# Nothing to change down here. -#### - -# Set up I2C Access -i2c = busio.I2C(board.SCL, board.SDA) - -# Create the sensor. -sensor = VL53L1X(i2c, sensor_address) -sensor.timing_budget = timing_budget -sensor.start_ranging() - -# Convert test time into seconds -test_time = Quantity(test_time).to('seconds').magnitude -test_start = time.monotonic() - -readings = [] - -while time.monotonic() - test_start < test_time: - reading = sensor.distance - print("Read: {}".format(reading)) - readings.append(reading) - time.sleep(timing_budget/1000) - -print("All readings: {}".format(readings)) -reading_avg = mean(readings) -reading_min = min(readings) -reading_max = max(readings) -print("Reading stats\n\tMin: {}\tMax: {}\tMean: {}".format(reading_min, reading_max, reading_avg)) +#!/usr/bin/python3 + +import argparse +import pathlib +import sys + +parser = argparse.ArgumentParser( + prog='sensor_test', + description='CobraBay Sensor Tester' +) + +parser.add_argument('-c', '--config', default='config.yaml', help='Location of the CobraBay config file.') +parser.add_argument('-l', '--lib', default='/home/pi/CobraBay/', help='Path to the CobraBay library directory.') +args = parser.parse_args() +print(args) +# Add the library to the python path. We're assuming this isn't installed at the system level. +sys.path.append(args.lib) +# Import and create a CobraBay config object. +from CobraBay.config import CBConfig + +config = CBConfig(args.config)