Skip to content

Commit

Permalink
flake8 formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-r-thorpe committed Apr 29, 2024
1 parent 0062040 commit 84959a6
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 340 deletions.
310 changes: 2 additions & 308 deletions nmostesting/IS12Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
from .MS05Utils import MS05Utils

import json
import os
import time

from copy import deepcopy
from enum import IntEnum, Enum
from enum import IntEnum
from itertools import takewhile, dropwhile
from jsonschema import FormatChecker, SchemaError, validate, ValidationError
from .Config import WS_MESSAGE_TIMEOUT
from .GenericTest import NMOSTestException
from .TestHelper import WebsocketWorker, load_resolved_schema
from .MS05Utils import NcMethodStatus, NcObjectMethods, NcBlockMethods, NcClassManagerMethods, NcClassManagerProperties, NcDatatypeType, StandardClassIds, NcObjectProperties, NcBlockProperties, NcObject, NcBlock, NcClassManager
from .MS05Utils import NcMethodStatus, NcObjectMethods, NcBlockMethods, NcClassManagerMethods, StandardClassIds

CONTROL_API_KEY = "ncp"
MS05_API_KEY = "controlframework"
Expand All @@ -48,13 +46,10 @@ def __init__(self, apis):
self.spec_path = self.apis[CONTROL_API_KEY]["spec_path"]
self.spec_branch = self.apis[CONTROL_API_KEY]["spec_branch"]
self._load_is12_schemas()
# self.ROOT_BLOCK_OID = 1
self.ncp_websocket = None
self.command_handle = 0
self.expect_notifications = False
self.notifications = []
# self.device_model = None
# self.class_manager = None

def _load_is12_schemas(self):
"""Load datatype and control class decriptors and create datatype JSON schemas"""
Expand Down Expand Up @@ -95,25 +90,6 @@ def close_ncp_websocket(self):
if self.ncp_websocket:
self.ncp_websocket.close()

# def validate_reference_datatype_schema(self, test, payload, datatype_name, context=""):
# """Validate payload against reference datatype schema"""
# self.validate_schema(test, payload, self.reference_datatype_schemas[datatype_name])

# def validate_schema(self, test, payload, schema, context=""):
# """Delegates to validate_schema. Raises NMOSTestExceptions on error"""
# if not schema:
# raise NMOSTestException(test.FAIL(context + "Missing schema. "))
# try:
# # Validate the JSON schema is correct
# checker = FormatChecker(["ipv4", "ipv6", "uri"])
# validate(payload, schema, format_checker=checker)
# except ValidationError as e:
# raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message))
# except SchemaError as e:
# raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message))

# return

def _validate_is12_schema(self, test, payload, schema_name, context=""):
"""Delegates to validate_schema. Raises NMOSTestExceptions on error"""
try:
Expand Down Expand Up @@ -350,24 +326,6 @@ def update_subscritions(self, test, subscriptions):
response = self.send_command(test, command_JSON)
return response

# def _primitive_to_JSON(self, type):
# """Convert MS-05 primitive type to corresponding JSON type"""

# types = {
# "NcBoolean": "boolean",
# "NcInt16": "number",
# "NcInt32": "number",
# "NcInt64": "number",
# "NcUint16": "number",
# "NcUint32": "number",
# "NcUint64": "number",
# "NcFloat32": "number",
# "NcFloat64": "number",
# "NcString": "string"
# }

# return types.get(type, False)

def primitive_to_python_type(self, type):
"""Convert MS-05 primitive type to corresponding Python type"""

Expand All @@ -386,72 +344,6 @@ def primitive_to_python_type(self, type):

return types.get(type, False)

# def descriptor_to_schema(self, descriptor):
# variant_type = ['number', 'string', 'boolean', 'object', 'array', 'null']

# json_schema = {}
# json_schema['$schema'] = 'http://json-schema.org/draft-07/schema#'

# json_schema['title'] = descriptor['name']
# json_schema['description'] = descriptor['description'] if descriptor['description'] else ""

# # Inheritance of datatype
# if descriptor.get('parentType'):
# if descriptor.get('isSequence'):
# json_schema['type'] = 'array'
# json_schema['items'] = {'$ref': descriptor['parentType'] + '.json'}
# else:
# json_schema['allOf'] = []
# json_schema['allOf'].append({'$ref': descriptor['parentType'] + '.json'})
# # Primitive datatype
# elif descriptor['type'] == NcDatatypeType.Primitive:
# json_schema['type'] = self._primitive_to_JSON(descriptor['name'])

# # Struct datatype
# if descriptor['type'] == NcDatatypeType.Struct and descriptor.get('fields'):
# json_schema['type'] = 'object'

# required = []
# properties = {}
# for field in descriptor['fields']:
# required.append(field['name'])

# property_type = {}
# if self._primitive_to_JSON(field['typeName']):
# if field['isNullable']:
# property_type = {'type': [self._primitive_to_JSON(field['typeName']), 'null']}
# else:
# property_type = {'type': self._primitive_to_JSON(field['typeName'])}
# else:
# if field.get('typeName'):
# if field['isNullable']:
# property_type['anyOf'] = []
# property_type['anyOf'].append({'$ref': field['typeName'] + '.json'})
# property_type['anyOf'].append({'type': 'null'})
# else:
# property_type = {'$ref': field['typeName'] + '.json'}
# else:
# # variant
# property_type = {'type': variant_type}

# if field.get('isSequence'):
# property_type = {'type': 'array', 'items': property_type}

# property_type['description'] = field['description'] if descriptor['description'] else ""
# properties[field['name']] = property_type

# json_schema['required'] = required
# json_schema['properties'] = properties

# # Enum datatype
# if descriptor['type'] == NcDatatypeType.Enum and descriptor.get('items'):
# json_schema['enum'] = []
# for item in descriptor['items']:
# json_schema['enum'].append(int(item['value']))
# json_schema['type'] = 'integer'

# return json_schema

def get_base_class_id(self, class_id):
""" Given a class_id returns the standard base class id as a string"""
return '.'.join([str(v) for v in takewhile(lambda x: x > 0, class_id)])
Expand All @@ -465,124 +357,6 @@ def is_manager(self, class_id):
""" Check class id to determine if this is a manager """
return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 3

# def load_reference_resources(self):
# """Load datatype and control class decriptors and create datatype JSON schemas"""
# # Calculate paths to MS-05 descriptors
# # including Feature Sets specified as additional_paths in test definition
# spec_paths = [os.path.join(self.apis[FEATURE_SETS_KEY]["spec_path"], path)
# for path in self.apis[FEATURE_SETS_KEY]["repo_paths"]]
# spec_paths.append(self.apis[MS05_API_KEY]["spec_path"])
# # Root path for primitive datatypes
# spec_paths.append('test_data/IS1201')

# datatype_paths = []
# classes_paths = []
# for spec_path in spec_paths:
# datatype_path = os.path.abspath(os.path.join(spec_path, 'models/datatypes/'))
# if os.path.exists(datatype_path):
# datatype_paths.append(datatype_path)
# classes_path = os.path.abspath(os.path.join(spec_path, 'models/classes/'))
# if os.path.exists(classes_path):
# classes_paths.append(classes_path)

# # Load class and datatype descriptors
# self.reference_class_descriptors = self._load_model_descriptors(classes_paths)

# # Load MS-05 datatype descriptors
# self.reference_datatype_descriptors = self._load_model_descriptors(datatype_paths)

# # Generate MS-05 datatype schemas from MS-05 datatype descriptors
# self.reference_datatype_schemas = self.generate_json_schemas(
# datatype_descriptors=self.reference_datatype_descriptors,
# schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/schemas/'))

# def _load_model_descriptors(self, descriptor_paths):
# descriptors = {}
# for descriptor_path in descriptor_paths:
# for filename in os.listdir(descriptor_path):
# name, extension = os.path.splitext(filename)
# if extension == ".json":
# with open(os.path.join(descriptor_path, filename), 'r') as json_file:
# descriptors[name] = json.load(json_file)

# return descriptors

# def generate_json_schemas(self, datatype_descriptors, schema_path):
# """Generate datatype schemas from datatype descriptors"""
# datatype_schema_names = []
# base_schema_path = os.path.abspath(schema_path)
# if not os.path.exists(base_schema_path):
# os.makedirs(base_schema_path)

# for name, descriptor in datatype_descriptors.items():
# json_schema = self.descriptor_to_schema(descriptor)
# with open(os.path.join(base_schema_path, name + '.json'), 'w') as output_file:
# json.dump(json_schema, output_file, indent=4)
# datatype_schema_names.append(name)

# # Load resolved MS-05 datatype schemas
# datatype_schemas = {}
# for name in datatype_schema_names:
# datatype_schemas[name] = load_resolved_schema(schema_path, name + '.json', path_prefix=False)

# return datatype_schemas

# def validate_descriptor(self, test, reference, descriptor, context=""):
# """Validate descriptor against reference descriptor. Raises NMOSTestException on error"""
# non_normative_keys = ['description']

# if isinstance(reference, dict):
# reference_keys = set(reference.keys())
# descriptor_keys = set(descriptor.keys())

# # compare the keys to see if any extra/missing
# key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys))
# if len(key_diff) > 0:
# error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys "
# raise NMOSTestException(test.FAIL(context + error_description + str(key_diff)))
# for key in reference_keys:
# if key in non_normative_keys and not isinstance(reference[key], dict):
# continue
# # Check for class ID
# if key == 'classId' and isinstance(reference[key], list):
# if reference[key] != descriptor[key]:
# raise NMOSTestException(test.FAIL(context + "Unexpected ClassId. Expected: "
# + str(reference[key])
# + " actual: " + str(descriptor[key])))
# else:
# self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->")
# elif isinstance(reference, list):
# if len(reference) > 0 and isinstance(reference[0], dict):
# # Convert to dict and validate
# references = {item['name']: item for item in reference}
# descriptors = {item['name']: item for item in descriptor}

# self.validate_descriptor(test, references, descriptors, context)
# elif reference != descriptor:
# raise NMOSTestException(test.FAIL(context + "Unexpected sequence. Expected: "
# + str(reference)
# + " actual: " + str(descriptor)))
# else:
# if reference != descriptor:
# raise NMOSTestException(test.FAIL(context + 'Expected value: '
# + str(reference)
# + ', actual value: '
# + str(descriptor)))
# return

# def _get_class_manager_descriptors(self, test, class_manager_oid, property_id):
# response = self.get_property_value(test, class_manager_oid, property_id)

# if not response:
# return None

# # Create descriptor dictionary from response array
# # Use classId as key if present, otherwise use name
# def key_lambda(classId, name): return ".".join(map(str, classId)) if classId else name
# descriptors = {key_lambda(r.get('classId'), r['name']): r for r in response}

# return descriptors

def query_device_model(self, test):
""" Query Device Model from the Node under test.
self.device_model_metadata set on Device Model validation error.
Expand All @@ -599,89 +373,9 @@ def query_device_model(self, test):
raise NMOSTestException(test.FAIL("Unable to query Device Model"))
return self.device_model

# def get_class_manager(self, test):
# """Get the Class Manager queried from the Node under test's Device Model"""
# if not self.class_manager:
# self.class_manager = self._get_manager(test, StandardClassIds.NCCLASSMANAGER.value)

# return self.class_manager

# def get_device_manager(self, test):
# """Get the Device Manager queried from the Node under test's Device Model"""
# return self._get_manager(test, StandardClassIds.NCDEVICEMANAGER.value)

# def _get_manager(self, test, class_id):
# self.open_ncp_websocket(test)
# device_model = self.query_device_model(test)
# members = device_model.find_members_by_class_id(class_id, include_derived=True)

# spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html".format(self.spec_branch)

# if len(members) == 0:
# raise NMOSTestException(test.FAIL("Manager not found in Root Block.", spec_link))

# if len(members) > 1:
# raise NMOSTestException(test.FAIL("Manager MUST be a singleton.", spec_link))

# return members[0]

# def _nc_object_factory(self, test, class_id, oid, role):
# """Create NcObject or NcBlock based on class_id"""
# # will set self.device_model_error to True if problems encountered
# try:
# runtime_constraints = self.get_property_value(
# test,
# oid,
# NcObjectProperties.RUNTIME_PROPERTY_CONSTRAINTS.value)

# # Check class id to determine if this is a block
# if len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1:
# member_descriptors = self.get_property_value(
# test,
# oid,
# NcBlockProperties.MEMBERS.value)

# nc_block = NcBlock(class_id, oid, role, member_descriptors, runtime_constraints)

# for m in member_descriptors:
# child_object = self._nc_object_factory(test, m["classId"], m["oid"], m["role"])
# if child_object:
# nc_block.add_child_object(child_object)

# return nc_block
# else:
# # Check to determine if this is a Class Manager
# if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2:
# class_descriptors = self._get_class_manager_descriptors(
# test,
# oid,
# NcClassManagerProperties.CONTROL_CLASSES.value)

# datatype_descriptors = self._get_class_manager_descriptors(
# test,
# oid,
# NcClassManagerProperties.DATATYPES.value)

# if not class_descriptors or not datatype_descriptors:
# # An error has likely occured
# return None

# return NcClassManager(class_id,
# oid,
# role,
# class_descriptors,
# datatype_descriptors,
# runtime_constraints)

# return NcObject(class_id, oid, role, runtime_constraints)

# except NMOSTestException as e:
# raise NMOSTestException(test.FAIL("Error in Device Model " + role + ": " + str(e.args[0].detail)))

def resolve_datatype(self, test, datatype):
"""Resolve datatype to its base type"""
class_manager = self.get_class_manager(test)
if class_manager.datatype_descriptors[datatype].get("parentType"):
return self.resolve_datatype(test, class_manager.datatype_descriptors[datatype].get("parentType"))
return datatype

Loading

0 comments on commit 84959a6

Please sign in to comment.