diff --git a/nmostesting/GenericTest.py b/nmostesting/GenericTest.py index 951eb1fa..f971a5a6 100644 --- a/nmostesting/GenericTest.py +++ b/nmostesting/GenericTest.py @@ -16,6 +16,7 @@ from requests.compat import json import git import jsonschema +import re import traceback import inspect import uuid @@ -592,48 +593,46 @@ def do_test_no_matching_public_key_authorization(self, api_name): return test.DISABLED("This test is only performed when an API supports Authorization and 'ENABLE_AUTH' " "is True") + def generate_parameterized_endpoints(self, endpoint_path, param_names, param_values=None, param_index=0): + if param_values is None: + param_values = {} + if param_names is None or param_index >= len(param_names): + return [(endpoint_path, param_values)] + + root_path = endpoint_path.split("{")[0].rstrip("/") + parameter_path = re.findall("([^}]+}[^{]*)", endpoint_path)[0].rstrip("/") + post_param = endpoint_path.split(parameter_path)[1].rstrip("/") + + if root_path not in self.saved_entities: + return [] + + endpoints = [] + for entity in self.saved_entities[root_path]: + param_values[param_names[param_index].name] = entity + partial_endpoint = parameter_path.format(**{param_names[param_index].name: entity}) + endpoints += self.generate_parameterized_endpoints("{}{}".format(partial_endpoint, post_param), + param_names, param_values.copy(), param_index+1) + return endpoints + def do_test_api_resource(self, resource, response_code, api): test = Test("{} /x-nmos/{}/{}{}".format(resource[1]['method'].upper(), api, self.apis[api]["version"], resource[0].rstrip("/")), self.auto_test_name(api)) - # Test URLs which include a single {resourceId} or similar parameter - if resource[1]['params'] and len(resource[1]['params']) == 1: - path = resource[0].split("{")[0].rstrip("/") - if path in self.saved_entities: - entity_warn = "" - for entity in NMOSUtils.sampled_list(self.saved_entities[path]): - params = {resource[1]['params'][0].name: entity} - entity_path = resource[0].rstrip("/").format(**params) - entity_valid, entity_message = self.check_api_resource(test, resource, response_code, api, - entity_path) - if not entity_valid: - return test.FAIL("Error for {}: {}".format(params, entity_message)) - elif entity_message and not entity_warn: - entity_warn = entity_message - if entity_warn: - return test.WARNING("Warning for {}: {}".format(params, entity_warn)) - else: - return test.PASS() - else: - # There were no saved entities found, so we can't test this parameterised URL - return test.UNCLEAR("No resources found to perform this test") - - # Test general URLs with no parameters - elif not resource[1]['params']: - valid, message = self.check_api_resource(test, resource, response_code, api, resource[0].rstrip("/")) - if valid: - if message: - return test.WARNING(message) - else: - return test.PASS() - else: - return test.FAIL(message) - - # URLs with more than one parameter are currently not tested by the automatically defined tests - else: - return None + endpoints = NMOSUtils.sampled_list(self.generate_parameterized_endpoints(resource[0], resource[1]['params'])) + for endpoint, param_values in endpoints: + entity_valid, entity_message = self.check_api_resource(test, resource, response_code, api, endpoint) + if not entity_valid: + return test.FAIL("Error for {}: {}".format(param_values, entity_message) + if param_values else entity_message) + elif entity_message: + return test.WARNING("Warning for {}: {}".format(param_values, entity_message) + if param_values else entity_message) + if len(endpoints) == 0: + # There were no saved entities found, so we can't test this parameterised URL + return test.UNCLEAR("No resources found to perform this test") + return test.PASS() def check_api_resource(self, test, resource, response_code, api, path): url = "{}{}".format(self.apis[api]["url"].rstrip("/"), path) diff --git a/nmostesting/NMOSTesting.py b/nmostesting/NMOSTesting.py index 74983135..c5077ae0 100644 --- a/nmostesting/NMOSTesting.py +++ b/nmostesting/NMOSTesting.py @@ -990,6 +990,14 @@ def run_noninteractive_tests(args): return exit_code +def get_package_name(requirement): + pattern = r'^([^\s>=<]+)' + match = re.match(pattern, requirement) + if match: + return match.group(1) + return None + + def check_internal_requirements(): corrections = {"gitpython": "git", "pyopenssl": "OpenSSL", @@ -1000,7 +1008,7 @@ def check_internal_requirements(): installed_pkgs = [pkg[1] for pkg in pkgutil.iter_modules()] with open("requirements.txt") as requirements_file: for requirement in requirements_file.readlines(): - requirement_name = requirement.strip().split(">")[0] + requirement_name = get_package_name(requirement) if requirement_name in corrections: corrected_req = corrections[requirement_name] else: diff --git a/requirements.txt b/requirements.txt index 93bb689e..e3884310 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ zeroconf>=0.75.0 requests netifaces gitpython -ramlfications +ramlfications==0.1.9 websocket-client>=1.0.0,!=1.2.2 jsonref>=1.0.0 dnslib