diff --git a/src/formpack/constants.py b/src/formpack/constants.py index 3a038e9a..f992940f 100644 --- a/src/formpack/constants.py +++ b/src/formpack/constants.py @@ -52,6 +52,7 @@ } GEO_QUESTION_TYPES = ('geopoint', 'geotrace', 'geoshape') +MEDIA_TYPES = ('audio', 'image', 'video') # Export Settings EXPORT_SETTING_FIELDS = 'fields' diff --git a/src/formpack/utils/expand_content.py b/src/formpack/utils/expand_content.py index 5ca611b6..3f1c8c67 100644 --- a/src/formpack/utils/expand_content.py +++ b/src/formpack/utils/expand_content.py @@ -6,21 +6,38 @@ import re from collections import OrderedDict from copy import deepcopy +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Tuple, + Union, +) from .array_to_xpath import EXPANDABLE_FIELD_TYPES from .iterator import get_first_occurrence from .replace_aliases import META_TYPES, selects -from ..constants import (UNTRANSLATED, OR_OTHER_COLUMN, - TAG_COLUMNS_AND_SEPARATORS) +from ..constants import ( + MEDIA_TYPES, + OR_OTHER_COLUMN, + TAG_COLUMNS_AND_SEPARATORS, + UNTRANSLATED, +) REMOVE_EMPTY_STRINGS = True # this will be used to check which version of formpack was used to compile the # asset content -SCHEMA_VERSION = "1" +SCHEMA_VERSION = '1' -def _expand_translatable_content(content, row, col_shortname, - special_column_details): +def _expand_translatable_content( + content: Dict[str, List[Any]], + row: Dict[str, Union[str, List[Any]]], + col_shortname: str, + special_column_details: Dict[str, Optional[str]], +) -> None: _scd = special_column_details if 'translation' in _scd: translations = content['translations'] @@ -39,7 +56,10 @@ def _expand_translatable_content(content, row, col_shortname, del row[col_shortname] -def _expand_tags(row, tag_cols_and_seps=None): +def _expand_tags( + row: Dict[str, Union[str, List[Any]]], + tag_cols_and_seps: Optional[Dict[str, str]] = None, +) -> Dict[str, Union[str, List[Any]]]: if tag_cols_and_seps is None: tag_cols_and_seps = {} tags = [] @@ -55,13 +75,16 @@ def _expand_tags(row, tag_cols_and_seps=None): tags_str = row.pop(tag_col, None) if tags_str and isinstance(tags_str, str): for tag in re.findall(r'([\#\+][a-zA-Z][a-zA-Z0-9_]*)', tags_str): - tags.append('hxl:%s' % tag) - if len(tags) > 0: + tags.append(f'hxl:{tag}') + if tags: row['tags'] = tags return row -def _get_translations_from_special_cols(special_cols, translations): +def _get_translations_from_special_cols( + special_cols: OrderedDict, + translations: List[str], +) -> Tuple[List[str], Set[str]]: translated_cols = [] for colname, parsedvals in iter(special_cols.items()): if 'translation' in parsedvals: @@ -71,8 +94,8 @@ def _get_translations_from_special_cols(special_cols, translations): return translations, set(translated_cols) -def expand_content_in_place(content): - (specials, translations, transl_cols) = _get_special_survey_cols(content) +def expand_content_in_place(content: Dict[str, List[Any]]) -> None: + specials, translations, transl_cols = _get_special_survey_cols(content) if len(translations) > 0: content['translations'] = translations @@ -94,10 +117,15 @@ def expand_content_in_place(content): # legacy {'select_one': 'xyz'} format might # still be on kobo-prod _type_str = _expand_type_to_dict( - get_first_occurrence(_type.keys()))['type'] + get_first_occurrence(_type.keys()) + )['type'] _list_name = get_first_occurrence(_type.values()) - row.update({'type': _type_str, - 'select_from_list_name': _list_name}) + row.update( + { + 'type': _type_str, + 'select_from_list_name': _list_name, + } + ) _expand_tags(row, tag_cols_and_seps=TAG_COLUMNS_AND_SEPARATORS) @@ -111,7 +139,7 @@ def expand_content_in_place(content): if REMOVE_EMPTY_STRINGS: row_copy = dict(row) for key, val in row_copy.items(): - if val == "": + if val == '': del row[key] # for now, prepend meta questions to the beginning of the survey @@ -133,7 +161,10 @@ def expand_content_in_place(content): content['schema'] = SCHEMA_VERSION -def expand_content(content, in_place=False): +def expand_content( + content: Dict[str, List[Any]], + in_place: bool = False, +) -> Optional[Dict[str, List[Any]]]: if in_place: expand_content_in_place(content) return None @@ -143,7 +174,29 @@ def expand_content(content, in_place=False): return content_copy -def _get_special_survey_cols(content): +def _get_known_translated_cols(translated_cols: List[str]) -> List[str]: + """ + This is necessary to handle a legacy issue where media attributes such as + `image`, `audio` and `video` were transformed to `media::x`, but their + value in the `translated` list was still `x` therefore not being recognized + as a "known translated" column. This resulted in a mismatch in labels and + translations and broke the exports and autoreport. + """ + if not translated_cols: + return [] + + _translated_cols = [] + for col in translated_cols: + if col in MEDIA_TYPES: + col = f'media::{col}' + _translated_cols.append(col) + + return _translated_cols + + +def _get_special_survey_cols( + content: Dict[str, List[Any]], +) -> Tuple[OrderedDict, List[str], List[str]]: """ This will extract information about columns in an xlsform with ':'s @@ -155,12 +208,16 @@ def _get_special_survey_cols(content): 'hint::English', For more examples, see tests. """ + RE_MEDIA_TYPES = '|'.join(MEDIA_TYPES) + uniq_cols = OrderedDict() special = OrderedDict() - known_translated_cols = content.get('translated', []) + known_translated_cols = _get_known_translated_cols( + content.get('translated') + ) - def _pluck_uniq_cols(sheet_name): + def _pluck_uniq_cols(sheet_name: str) -> None: for row in content.get(sheet_name, []): # we don't want to expand columns which are already known # to be parsed and translated in a previous iteration @@ -168,7 +225,7 @@ def _pluck_uniq_cols(sheet_name): uniq_cols.update(OrderedDict.fromkeys(_cols)) - def _mark_special(**kwargs): + def _mark_special(**kwargs: str) -> None: column_name = kwargs.pop('column_name') special[column_name] = kwargs @@ -177,57 +234,73 @@ def _mark_special(**kwargs): for column_name in uniq_cols.keys(): if column_name in ['label', 'hint']: - _mark_special(column_name=column_name, - column=column_name, - translation=UNTRANSLATED) - if ':' not in column_name: + _mark_special( + column_name=column_name, + column=column_name, + translation=UNTRANSLATED, + ) + if ':' not in column_name and column_name not in MEDIA_TYPES: continue if column_name.startswith('bind:'): continue if column_name.startswith('body:'): continue - mtch = re.match(r'^media\s*::?\s*([^:]+)\s*::?\s*([^:]+)$', column_name) + mtch = re.match( + rf'^(media\s*::?\s*)?({RE_MEDIA_TYPES})\s*::?\s*([^:]+)$', + column_name, + ) if mtch: matched = mtch.groups() - media_type = matched[0] - _mark_special(column_name=column_name, - column='media::{}'.format(media_type), - coltype='media', - media=media_type, - translation=matched[1]) + media_type = matched[1] + translation = matched[2] + _mark_special( + column_name=column_name, + column='media::{}'.format(media_type), + coltype='media', + media=media_type, + translation=translation, + ) continue - mtch = re.match(r'^media\s*::?\s*([^:]+)$', column_name) + mtch = re.match(rf'^(media\s*::?\s*)?({RE_MEDIA_TYPES})$', column_name) if mtch: - media_type = mtch.groups()[0] - _mark_special(column_name=column_name, - column='media::{}'.format(media_type), - coltype='media', - media=media_type, - translation=UNTRANSLATED) + matched = mtch.groups() + media_type = matched[1] + _mark_special( + column_name=column_name, + column='media::{}'.format(media_type), + coltype='media', + media=media_type, + translation=UNTRANSLATED, + ) continue mtch = re.match(r'^([^:]+)\s*::?\s*([^:]+)$', column_name) if mtch: # example: label::x, constraint_message::x, hint::x matched = mtch.groups() column_shortname = matched[0] - _mark_special(column_name=column_name, - column=column_shortname, - translation=matched[1]) + _mark_special( + column_name=column_name, + column=column_shortname, + translation=matched[1], + ) # also add the empty column if it exists if column_shortname in uniq_cols: - _mark_special(column_name=column_shortname, - column=column_shortname, - translation=UNTRANSLATED) + _mark_special( + column_name=column_shortname, + column=column_shortname, + translation=UNTRANSLATED, + ) continue - (translations, - translated_cols) = _get_translations_from_special_cols(special, - content.get('translations', [])) + translations, translated_cols = _get_translations_from_special_cols( + special, + content.get('translations', []), + ) translated_cols.update(known_translated_cols) return special, translations, sorted(translated_cols) -def _expand_type_to_dict(type_str): +def _expand_type_to_dict(type_str: str) -> Dict[str, Union[str, bool]]: SELECT_PATTERN = r'^({select_type})\s+(\S+)$' out = {} match = re.search('( or.other)$', type_str) @@ -255,6 +328,6 @@ def _expand_type_to_dict(type_str): return {'type': type_str} -def _expand_xpath_to_list(xpath_string): +def _expand_xpath_to_list(xpath_string: str) -> str: # a placeholder for a future expansion return xpath_string diff --git a/tests/test_expand_content.py b/tests/test_expand_content.py index 44586ac1..ddac410b 100644 --- a/tests/test_expand_content.py +++ b/tests/test_expand_content.py @@ -186,6 +186,46 @@ def test_expand_translated_media(): } +def test_expand_translated_media_mangled_format(): + """ + An unfortunate bug seen in formpack#115 has resulted in needing to account + for this behaviour if surveys used image::lang rather than + media::image::lang + """ + s1 = { + 'survey': [ + { + 'type': 'note', + 'media::image': ['eng.jpg'], + }, + ], + 'translated': ['image'], # Bug 🐛: not coming through as media::image + 'schema': SCHEMA_VERSION, + 'translations': ['English (en)'] + } + expand_content(s1, in_place=True) + assert s1 == { + 'survey': [ + { + 'type': 'note', + 'media::image': ['eng.jpg'], + }, + ], + 'translated': ['media::image'], + 'schema': SCHEMA_VERSION, + 'translations': ['English (en)'], + } + flatten_content(s1, in_place=True) + assert s1 == { + 'survey': [ + { + 'type': 'note', + 'media::image::English (en)': 'eng.jpg', + }, + ], + } + + def test_expand_translated_media_with_no_translated(): s1 = {'survey': [{'type': 'note', 'media::image': 'nolang.jpg',