Skip to content

Commit

Permalink
Merge pull request #648 from lindsay-stevens/pyxform-646
Browse files Browse the repository at this point in the history
646: Detect instance() expressions in notes and make them into outputs
  • Loading branch information
lognaturel authored Aug 29, 2023
2 parents 5a52700 + ca010a1 commit 80ebeb1
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 29 deletions.
Empty file added pyxform/parsing/__init__.py
Empty file.
127 changes: 127 additions & 0 deletions pyxform/parsing/instance_expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import re
from typing import TYPE_CHECKING, List, Tuple

from pyxform.utils import BRACKETED_TAG_REGEX, EXPRESSION_LEXER, ExpLexerToken

if TYPE_CHECKING:
from pyxform.survey import Survey
from pyxform.survey_element import SurveyElement


def instance_func_start(token: ExpLexerToken) -> bool:
"""
Determine if the token is the start of an instance expression.
:param token: The token to examine.
:return: If True, the token is the start of an instance expression.
"""
if token is None:
return False
return token.name == "FUNC_CALL" and token.value == "instance("


def find_boundaries(xml_text: str) -> List[Tuple[int, int]]:
"""
Find token boundaries of any instance() expression.
Presumed:
- An instance expression is followed by an XML path expression.
- Any token is allowed inside a predicate (e.g. nested paths/preds/funcs).
- When not inside a predicate, whitespace terminates a XML path expression.
- instance expressions are valid inside predicates of other instance expressions.
:param xml_text: XML text that may contain an instance expression.
:return: Tokens in instance expression, and the string position boundaries.
"""
instance_enter = False
path_enter = False
pred_enter = False
last_token = None
tokens, _ = EXPRESSION_LEXER.scan(xml_text)
boundaries = []

for t in tokens:
emit = False
# If an instance expression had started, note the string position boundary.
if instance_func_start(token=t) and not instance_enter:
instance_enter = True
emit = True
boundaries.append(t.start)
# Tokens that are part of an instance expression.
elif instance_enter:
# Tokens that are part of the instance call.
if instance_func_start(token=last_token) and t.name == "SYSTEM_LITERAL":
emit = True
elif last_token.name == "SYSTEM_LITERAL" and t.name == "CLOSE_PAREN":
emit = True
elif t.name == "PATH_SEP" and last_token.name == "CLOSE_PAREN":
emit = True
path_enter = True
# A XPath path may continue after a predicate.
elif t.name == "PATH_SEP" and last_token.name == "XPATH_PRED_END":
emit = True
path_enter = True
# Tokens that are part of a XPath path.
elif path_enter:
if t.name == "WHITESPACE":
path_enter = False
elif t.name != "XPATH_PRED_START":
emit = True
elif t.name == "XPATH_PRED_START":
emit = True
path_enter = False
pred_enter = True
# Tokens that are part of a XPath predicate.
elif pred_enter:
if t.name != "XPATH_PRED_END":
emit = True
elif t.name == "XPATH_PRED_END":
emit = True
pred_enter = False
# Track instance expression tokens, ignore others.
if emit:
last_token = t
# If an instance expression had ended, note the string position boundary.
elif instance_enter:
instance_enter = False
boundaries.append(last_token.end)

if last_token is not None:
boundaries.append(last_token.end)

# Pair up the boundaries [1, 2, 3, 4] -> [(1, 2), (3, 4)].
bounds = iter(boundaries)
pos_bounds = [(x, y) for x, y in zip(bounds, bounds)]
return pos_bounds


def replace_with_output(xml_text: str, context: "SurveyElement", survey: "Survey") -> str:
"""
Find occurrences of instance expressions and replace them with <output/> elements.
:param xml_text: The text string to search/replace.
:param context: The SurveyElement that this string belongs to.
:param survey: The Survey that the context is in.
:return: The possibly modified string.
"""
boundaries = find_boundaries(xml_text=xml_text)
if 0 < len(boundaries):
new_strings = []
for start, end in boundaries:
old_str = xml_text[start:end]
# Pass the new string through the pyxform reference replacer.
# noinspection PyProtectedMember
new_str = re.sub(
BRACKETED_TAG_REGEX,
lambda m: survey._var_repl_function(m, context),
old_str,
)
new_strings.append((start, end, old_str, f'<output value="{new_str}" />'))
# Position-based replacement avoids strings which are substrings of other
# replacements being inserted incorrectly. Offset tracking deals with changing
# expression positions due to incremental replacement.
offset = 0
for s, e, o, n in new_strings:
xml_text = xml_text[: s + offset] + n + xml_text[e + offset :]
offset += len(n) - len(o)
return xml_text
72 changes: 45 additions & 27 deletions pyxform/survey.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from typing import Iterator, List, Optional
from typing import Iterator, List, Optional, Tuple

from pyxform import aliases, constants
from pyxform.constants import EXTERNAL_INSTANCE_EXTENSIONS
from pyxform.errors import PyXFormError, ValidationError
from pyxform.external_instance import ExternalInstance
from pyxform.instance import SurveyInstance
from pyxform.instance_info import InstanceInfo
from pyxform.parsing import instance_expression
from pyxform.question import Option, Question
from pyxform.section import Section
from pyxform.survey_element import SurveyElement
Expand All @@ -34,7 +35,16 @@
)
from pyxform.validators import enketo_validate, odk_validate

RE_BRACKET = re.compile(r"\[([^]]+)\]")
RE_FUNCTION_ARGS = re.compile(r"\b[^()]+\((.*)\)$")
RE_INDEXED_REPEAT = re.compile(r"indexed-repeat\([^)]+\)")
RE_INSTANCE = re.compile(r"instance\([^)]+.+")
RE_INSTANCE_SECONDARY_REF = re.compile(
r"(instance\(.*\)\/root\/item\[.*?(\$\{.*\})\]\/.*?)\s"
)
RE_PULLDATA = re.compile(r"(pulldata\s*\(\s*)(.*?),")
RE_XML_OUTPUT = re.compile(r"\n.*(<output.*>)\n(\s\s)*")
RE_XML_TEXT = re.compile(r"(>)\n\s*(\s[^<>\s].*?)\n\s*(\s</)", re.DOTALL)


def register_nsmap():
Expand Down Expand Up @@ -870,10 +880,10 @@ def _to_pretty_xml(self):
# TODO: check out pyxml
# http://ronrothman.com/public/leftbraned/xml-dom-minidom-toprettyxml-and-silly-whitespace/
xml_with_linebreaks = self.xml().toprettyxml(indent=" ")
text_re = re.compile(r"(>)\n\s*(\s[^<>\s].*?)\n\s*(\s</)", re.DOTALL)
output_re = re.compile(r"\n.*(<output.*>)\n(\s\s)*")
pretty_xml = text_re.sub(lambda m: "".join(m.group(1, 2, 3)), xml_with_linebreaks)
inline_output = output_re.sub(r"\g<1>", pretty_xml)
pretty_xml = RE_XML_TEXT.sub(
lambda m: "".join(m.group(1, 2, 3)), xml_with_linebreaks
)
inline_output = RE_XML_OUTPUT.sub(r"\g<1>", pretty_xml)
return '<?xml version="1.0"?>\n' + inline_output

def __repr__(self):
Expand Down Expand Up @@ -902,19 +912,15 @@ def _var_repl_function(
name = matchobj.group(2)
last_saved = matchobj.group(1) is not None
is_indexed_repeat = matchobj.string.find("indexed-repeat(") > -1
indexed_repeat_regex = re.compile(r"indexed-repeat\([^)]+\)")
function_args_regex = re.compile(r"\b[^()]+\((.*)\)$")
instance_regex = re.compile(r"instance\([^)]+.+")
bracket_regex = re.compile(r"\[([^]]+)\]")

def _in_secondary_instance_predicate():
def _in_secondary_instance_predicate() -> bool:
"""
check if ${} expression represented by matchobj
is in a predicate for a path expression for a secondary instance
"""

if instance_regex.search(matchobj.string) is not None:
bracket_regex_match_iter = bracket_regex.finditer(matchobj.string)
if RE_INSTANCE.search(matchobj.string) is not None:
bracket_regex_match_iter = RE_BRACKET.finditer(matchobj.string)
# Check whether current ${varname} is in the correct bracket_regex_match
for bracket_regex_match in bracket_regex_match_iter:
if (
Expand All @@ -925,10 +931,11 @@ def _in_secondary_instance_predicate():
return False
return False

def _relative_path(name):
def _relative_path(ref_name: str, _use_current: bool) -> Optional[str]:
"""Given name in ${name}, return relative xpath to ${name}."""
return_path = None
xpath, context_xpath = self._xpath[name], context.get_xpath()
xpath = self._xpath[ref_name]
context_xpath = context.get_xpath()
# share same root i.e repeat_a from /data/repeat_a/...
if (
len(context_xpath.split("/")) > 2
Expand All @@ -940,14 +947,13 @@ def _relative_path(name):
self, xpath, context_xpath, reference_parent
)
if steps:
ref_path = ref_path if ref_path.endswith(name) else "/%s" % name
prefix = " current()/" if use_current else " "

ref_path = ref_path if ref_path.endswith(ref_name) else "/%s" % name
prefix = " current()/" if _use_current else " "
return_path = prefix + "/".join([".."] * steps) + ref_path + " "

return return_path

def _is_return_relative_path():
def _is_return_relative_path() -> bool:
"""Determine condition to return relative xpath of current ${name}."""
indexed_repeat_relative_path_args_index = [0, 1, 3, 5]
current_matchobj = matchobj
Expand All @@ -958,7 +964,7 @@ def _is_return_relative_path():
return True

# It is possible to have multiple indexed-repeat in an expression
indexed_repeats_iter = indexed_repeat_regex.finditer(matchobj.string)
indexed_repeats_iter = RE_INDEXED_REPEAT.finditer(matchobj.string)
for indexed_repeat in indexed_repeats_iter:

# Make sure current ${name} is in the correct indexed-repeat
Expand All @@ -978,7 +984,7 @@ def _is_return_relative_path():

indexed_repeat_name_index = None
indexed_repeat_args = (
function_args_regex.search(indexed_repeat.group())
RE_FUNCTION_ARGS.search(indexed_repeat.group())
.group(1)
.split(",")
)
Expand Down Expand Up @@ -1009,7 +1015,7 @@ def _is_return_relative_path():
if _is_return_relative_path():
if not use_current:
use_current = _in_secondary_instance_predicate()
relative_path = _relative_path(name)
relative_path = _relative_path(ref_name=name, _use_current=use_current)
if relative_path:
return relative_path

Expand Down Expand Up @@ -1037,11 +1043,19 @@ def _var_repl_output_function(self, matchobj, context):
"""
return '<output value="' + self._var_repl_function(matchobj, context) + '" />'

def insert_output_values(self, text, context=None):
def insert_output_values(
self,
text: str,
context: Optional[SurveyElement] = None,
) -> Tuple[str, bool]:
"""
Replace all the ${variables} in text with xpaths.
Returns that and a boolean indicating if there were any ${variables}
present.
:param text: Input text to process.
:param context: The document node that the text belongs to.
:return: The output text, and a flag indicating whether any changes were made.
"""

def _var_repl_output_function(matchobj):
Expand All @@ -1054,15 +1068,19 @@ def _var_repl_output_function(matchobj):
# variable replacement:
text_node = PatchedText()
text_node.data = text
xml_text = text_node.toxml()
original_xml = text_node.toxml()

# need to make sure we have reason to replace
# since at this point < is &lt,
# the net effect &lt gets translated again to &amp;lt;
if str(xml_text).find("{") != -1:
result = re.sub(BRACKETED_TAG_REGEX, _var_repl_output_function, str(xml_text))
return result, not result == xml_text
return text, False
xml_text = instance_expression.replace_with_output(original_xml, context, self)
if "{" in xml_text:
xml_text = re.sub(BRACKETED_TAG_REGEX, _var_repl_output_function, xml_text)
changed = xml_text != original_xml
if changed:
return xml_text, True
else:
return text, False

# pylint: disable=too-many-arguments
def print_xform_to_file(
Expand Down
2 changes: 1 addition & 1 deletion pyxform/survey_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def xml_label_and_hint(self) -> "List[DetachableElement]":
result.append(self.xml_label())
result.append(self.xml_hint())

msg = "The survey element named '%s' " "has no label or hint." % self.name
msg = "The survey element named '%s' has no label or hint." % self.name
if len(result) == 0:
raise PyXFormError(msg)

Expand Down
3 changes: 2 additions & 1 deletion pyxform/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ def get_expression_lexer() -> re.Scanner: # noqa
"WHITESPACE": r"\s+",
"PYXFORM_REF": r"\$\{" + ncname_regex + r"(#" + ncname_regex + r")?" + r"\}",
"FUNC_CALL": ncname_regex + r"\(",
"XPATH_PRED": ncname_regex + r"\[",
"XPATH_PRED_START": ncname_regex + r"\[",
"XPATH_PRED_END": r"\]",
"URI_SCHEME": ncname_regex + r"://",
"NAME": ncname_regex, # Must be after rules containing ncname_regex.
"OTHER": r".+?", # Catch any other character so that parsing doesn't stop.
Expand Down
Loading

0 comments on commit 80ebeb1

Please sign in to comment.