Skip to content

Commit

Permalink
feat(pm4py): return diagnostics dataframe option in pm4py.conformance…
Browse files Browse the repository at this point in the history
… methods
  • Loading branch information
fit-alessandro-berti committed Apr 6, 2023
1 parent 0be79b3 commit 42b0d2a
Showing 1 changed file with 77 additions and 21 deletions.
98 changes: 77 additions & 21 deletions pm4py/conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pm4py.objects.log.obj import EventLog, Trace, Event, EventStream
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.convert import convert_to_event_log
from collections import Counter
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.util import xes_constants, constants
Expand All @@ -17,7 +18,7 @@


def conformance_diagnostics_token_based_replay(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet, initial_marking: Marking,
final_marking: Marking, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> List[Dict[str, Any]]:
final_marking: Marking, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name", return_diagnostics_dataframe: bool = False) -> List[Dict[str, Any]]:
"""
Apply token-based replay for conformance checking analysis.
The methods return the full token-based-replay diagnostics.
Expand Down Expand Up @@ -45,6 +46,7 @@ def conformance_diagnostics_token_based_replay(log: Union[EventLog, pd.DataFrame
:param activity_key: attribute to be used for the activity
:param timestamp_key: attribute to be used for the timestamp
:param case_id_key: attribute to be used as case identifier
:param return_diagnostics_dataframe: if possible, returns a dataframe with the diagnostics (instead of the usual output)
:rtype: ``List[Dict[str, Any]]``
.. code-block:: python3
Expand All @@ -60,13 +62,22 @@ def conformance_diagnostics_token_based_replay(log: Union[EventLog, pd.DataFrame
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

if return_diagnostics_dataframe:
log = convert_to_event_log(log, case_id_key=case_id_key)
case_id_key = None

properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
return token_replay.apply(log, petri_net, initial_marking, final_marking, parameters=properties)
result = token_replay.apply(log, petri_net, initial_marking, final_marking, parameters=properties)

if return_diagnostics_dataframe:
return token_replay.get_diagnostics_dataframe(log, result, parameters=properties)

return result


def conformance_diagnostics_alignments(log: Union[EventLog, pd.DataFrame], *args, multi_processing: bool = constants.ENABLE_MULTIPROCESSING_DEFAULT, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name", variant_str : Optional[str] = None) -> List[Dict[str, Any]]:
def conformance_diagnostics_alignments(log: Union[EventLog, pd.DataFrame], *args, multi_processing: bool = constants.ENABLE_MULTIPROCESSING_DEFAULT, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name", variant_str : Optional[str] = None, return_diagnostics_dataframe: bool = False) -> List[Dict[str, Any]]:
"""
Apply the alignments algorithm between a log and a process model.
The methods return the full alignment diagnostics.
Expand All @@ -92,6 +103,7 @@ def conformance_diagnostics_alignments(log: Union[EventLog, pd.DataFrame], *args
:param timestamp_key: attribute to be used for the timestamp
:param case_id_key: attribute to be used as case identifier
:param variant_str: variant specification (for Petri net alignments)
:param return_diagnostics_dataframe: if possible, returns a dataframe with the diagnostics (instead of the usual output)
:rtype: ``List[Dict[str, Any]]``
.. code-block:: python3
Expand All @@ -107,6 +119,10 @@ def conformance_diagnostics_alignments(log: Union[EventLog, pd.DataFrame], *args
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

if return_diagnostics_dataframe:
log = convert_to_event_log(log, case_id_key=case_id_key)
case_id_key = None

properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

if len(args) == 3:
Expand All @@ -117,29 +133,43 @@ def conformance_diagnostics_alignments(log: Union[EventLog, pd.DataFrame], *args
if variant_str is not None:
variant = variant_str
if multi_processing:
return alignments.apply_multiprocessing(log, args[0], args[1], args[2], parameters=properties, variant=variant)
result = alignments.apply_multiprocessing(log, args[0], args[1], args[2], parameters=properties, variant=variant)
else:
return alignments.apply(log, args[0], args[1], args[2], parameters=properties, variant=variant)
result = alignments.apply(log, args[0], args[1], args[2], parameters=properties, variant=variant)

if return_diagnostics_dataframe:
return alignments.get_diagnostics_dataframe(log, result, parameters=properties)

return result
elif isinstance(args[0], dict):
# DFG alignments
from pm4py.algo.conformance.alignments.dfg import algorithm as dfg_alignment
return dfg_alignment.apply(log, args[0], args[1], args[2], parameters=properties)
result = dfg_alignment.apply(log, args[0], args[1], args[2], parameters=properties)

return result
elif len(args) == 1:
if type(args[0]) is ProcessTree:
# process tree alignments
from pm4py.algo.conformance.alignments.process_tree.variants import search_graph_pt
if multi_processing:
return search_graph_pt.apply_multiprocessing(log, args[0], parameters=properties)
result = search_graph_pt.apply_multiprocessing(log, args[0], parameters=properties)
else:
return search_graph_pt.apply(log, args[0], parameters=properties)
result = search_graph_pt.apply(log, args[0], parameters=properties)

return result
# try to convert to Petri net
import pm4py
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments
net, im, fm = pm4py.convert_to_petri_net(*args)
if multi_processing:
return alignments.apply_multiprocessing(log, net, im, fm, parameters=properties)
result = alignments.apply_multiprocessing(log, net, im, fm, parameters=properties)
else:
return alignments.apply(log, net, im, fm, parameters=properties)
result = alignments.apply(log, net, im, fm, parameters=properties)

if return_diagnostics_dataframe:
return alignments.get_diagnostics_dataframe(log, result, parameters=properties)

return result


def fitness_token_based_replay(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> \
Expand Down Expand Up @@ -184,9 +214,11 @@ def fitness_token_based_replay(log: Union[EventLog, pd.DataFrame], petri_net: Pe
properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
return replay_fitness.apply(log, petri_net, initial_marking, final_marking,
result = replay_fitness.apply(log, petri_net, initial_marking, final_marking,
variant=replay_fitness.Variants.TOKEN_BASED, parameters=properties)

return result


def fitness_alignments(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = constants.ENABLE_MULTIPROCESSING_DEFAULT, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> \
Dict[str, float]:
Expand Down Expand Up @@ -231,9 +263,11 @@ def fitness_alignments(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet,
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["multiprocessing"] = multi_processing
return replay_fitness.apply(log, petri_net, initial_marking, final_marking,
result = replay_fitness.apply(log, petri_net, initial_marking, final_marking,
variant=replay_fitness.Variants.ALIGNMENT_BASED, parameters=parameters)

return result


def precision_token_based_replay(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet, initial_marking: Marking,
final_marking: Marking, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> float:
Expand Down Expand Up @@ -276,9 +310,11 @@ def precision_token_based_replay(log: Union[EventLog, pd.DataFrame], petri_net:
properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

from pm4py.algo.evaluation.precision import algorithm as precision_evaluator
return precision_evaluator.apply(log, petri_net, initial_marking, final_marking,
result = precision_evaluator.apply(log, petri_net, initial_marking, final_marking,
variant=precision_evaluator.Variants.ETCONFORMANCE_TOKEN, parameters=properties)

return result


def precision_alignments(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet, initial_marking: Marking,
final_marking: Marking, multi_processing: bool = constants.ENABLE_MULTIPROCESSING_DEFAULT, activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> float:
Expand Down Expand Up @@ -324,10 +360,12 @@ def precision_alignments(log: Union[EventLog, pd.DataFrame], petri_net: PetriNet
from pm4py.algo.evaluation.precision import algorithm as precision_evaluator
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["multiprocessing"] = multi_processing
return precision_evaluator.apply(log, petri_net, initial_marking, final_marking,
result = precision_evaluator.apply(log, petri_net, initial_marking, final_marking,
variant=precision_evaluator.Variants.ALIGN_ETCONFORMANCE,
parameters=parameters)

return result


@deprecation.deprecated(deprecated_in="2.3.0", removed_in="3.0.0", details="conformance checking using footprints will not be exposed in a future release")
def __convert_to_fp(*args) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
Expand Down Expand Up @@ -370,9 +408,11 @@ def conformance_diagnostics_footprints(*args) -> Union[List[Dict[str, Any]], Dic
fp2 = __convert_to_fp(args[1:])
from pm4py.algo.conformance.footprints import algorithm as footprints_conformance
if isinstance(fp1, list):
return footprints_conformance.apply(fp1, fp2, variant=footprints_conformance.Variants.TRACE_EXTENSIVE)
result = footprints_conformance.apply(fp1, fp2, variant=footprints_conformance.Variants.TRACE_EXTENSIVE)
else:
return footprints_conformance.apply(fp1, fp2, variant=footprints_conformance.Variants.LOG_EXTENSIVE)
result = footprints_conformance.apply(fp1, fp2, variant=footprints_conformance.Variants.LOG_EXTENSIVE)

return result


@deprecation.deprecated(deprecated_in="2.3.0", removed_in="3.0.0", details="conformance checking using footprints will not be exposed in a future release")
Expand All @@ -396,7 +436,9 @@ def fitness_footprints(*args) -> Dict[str, float]:
fp1 = __convert_to_fp(args[0])
fp2 = __convert_to_fp(args[1:])
from pm4py.algo.conformance.footprints.util import evaluation
return evaluation.fp_fitness(fp1, fp2, fp_conf)
result = evaluation.fp_fitness(fp1, fp2, fp_conf)

return result


@deprecation.deprecated(deprecated_in="2.3.0", removed_in="3.0.0", details="conformance checking using footprints will not be exposed in a future release")
Expand All @@ -417,7 +459,9 @@ def precision_footprints(*args) -> float:
fp1 = __convert_to_fp(args[0])
fp2 = __convert_to_fp(args[1:])
from pm4py.algo.conformance.footprints.util import evaluation
return evaluation.fp_precision(fp1, fp2)
result = evaluation.fp_precision(fp1, fp2)

return result


@deprecation.deprecated(removed_in="2.3.0", deprecated_in="3.0.0", details="this method will be removed in a future release.")
Expand Down Expand Up @@ -558,10 +602,12 @@ def conformance_temporal_profile(log: Union[EventLog, pd.DataFrame], temporal_pr
properties["zeta"] = zeta

from pm4py.algo.conformance.temporal_profile import algorithm as temporal_profile_conformance
return temporal_profile_conformance.apply(log, temporal_profile, parameters=properties)
result = temporal_profile_conformance.apply(log, temporal_profile, parameters=properties)

return result


def conformance_log_skeleton(log: Union[EventLog, pd.DataFrame], log_skeleton: Dict[str, Any], activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name") -> List[Set[Any]]:
def conformance_log_skeleton(log: Union[EventLog, pd.DataFrame], log_skeleton: Dict[str, Any], activity_key: str = "concept:name", timestamp_key: str = "time:timestamp", case_id_key: str = "case:concept:name", return_diagnostics_dataframe: bool = False) -> List[Set[Any]]:
"""
Performs conformance checking using the log skeleton
Expand Down Expand Up @@ -590,6 +636,7 @@ def conformance_log_skeleton(log: Union[EventLog, pd.DataFrame], log_skeleton: D
:param activity_key: attribute to be used for the activity
:param timestamp_key: attribute to be used for the timestamp
:param case_id_key: attribute to be used as case identifier
:param return_diagnostics_dataframe: if possible, returns a dataframe with the diagnostics (instead of the usual output)
:rtype: ``List[Set[Any]]``
.. code-block:: python3
Expand All @@ -605,7 +652,16 @@ def conformance_log_skeleton(log: Union[EventLog, pd.DataFrame], log_skeleton: D
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

if return_diagnostics_dataframe:
log = convert_to_event_log(log, case_id_key=case_id_key)
case_id_key = None

properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)

from pm4py.algo.conformance.log_skeleton import algorithm as log_skeleton_conformance
return log_skeleton_conformance.apply(log, log_skeleton, parameters=properties)
result = log_skeleton_conformance.apply(log, log_skeleton, parameters=properties)

if return_diagnostics_dataframe:
return log_skeleton_conformance.get_diagnostics_dataframe(log, result, parameters=properties)

return result

0 comments on commit 42b0d2a

Please sign in to comment.