From 6c975da05cc12cd6ef23e77826243a9317ec1196 Mon Sep 17 00:00:00 2001 From: benedictus-yevu Date: Tue, 21 Nov 2023 02:19:30 +0000 Subject: [PATCH 1/2] Drop invocation reasons --- CHANGELOG.md | 3 + lib/lightning/accounts.ex | 10 +-- lib/lightning/attempts/attempt.ex | 7 +- lib/lightning/projects.ex | 31 --------- lib/lightning/setup_utils.ex | 1 - lib/lightning/workorders/workorder.ex | 10 ++- ...ove_invocation_reasons_and_constraints.exs | 28 ++++++++ test/lightning/accounts_test.exs | 47 ++++++++++--- test/lightning/attempt_service_test.exs | 15 ++-- test/lightning/invocation/attempt_test.exs | 6 -- test/lightning/invocation/workorder_test.exs | 6 -- test/lightning/reasons_test.exs | 69 ------------------- test/lightning_web/live/user_live_test.exs | 20 +++++- 13 files changed, 105 insertions(+), 148 deletions(-) create mode 100644 priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs delete mode 100644 test/lightning/reasons_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c77d6baba..7feb79acc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -170,6 +170,9 @@ bearing with us as we move towards our first stable Lightning release.) [#1348](https://github.com/OpenFn/Lightning/issues/1348) - Modify CompleteRun to return error changeset when run not found [#1393](https://github.com/OpenFn/Lightning/issues/1393) +- Drop invocation reasons from DB + [#1412](https://github.com/OpenFn/Lightning/issues/1412) + ## [v0.9.3] - 2023-09-27 diff --git a/lib/lightning/accounts.ex b/lib/lightning/accounts.ex index 0b0ace5102..16f3c5b97a 100644 --- a/lib/lightning/accounts.ex +++ b/lib/lightning/accounts.ex @@ -25,9 +25,9 @@ defmodule Lightning.Accounts do def has_activity_in_projects?(%User{id: id} = _user) do count = - from(invocation_reason in Lightning.InvocationReason, - where: invocation_reason.user_id == ^id, - select: count(invocation_reason.id) + from(attempt in Lightning.Attempt, + where: attempt.created_by_id == ^id, + select: count(attempt.id) ) |> Repo.one() @@ -67,9 +67,9 @@ defmodule Lightning.Accounts do @impl Oban.Worker def perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) do users_with_activities = - from(invocation_reason in Lightning.InvocationReason, + from(attempt in Lightning.Attempt, join: user in Lightning.Accounts.User, - on: invocation_reason.user_id == user.id, + on: attempt.created_by_id == user.id, select: user.id, distinct: true ) diff --git a/lib/lightning/attempts/attempt.ex b/lib/lightning/attempts/attempt.ex index 46d44a3794..9cb18cdc00 100644 --- a/lib/lightning/attempts/attempt.ex +++ b/lib/lightning/attempts/attempt.ex @@ -9,7 +9,6 @@ defmodule Lightning.Attempt do import Lightning.Validators alias Lightning.Accounts.User - alias Lightning.InvocationReason alias Lightning.WorkOrder alias Lightning.Invocation.Run alias Lightning.Invocation.LogLine @@ -40,7 +39,6 @@ defmodule Lightning.Attempt do @type t :: %__MODULE__{ __meta__: Ecto.Schema.Metadata.t(), id: Ecto.UUID.t() | nil, - reason: InvocationReason.t() | Ecto.Association.NotLoaded.t(), work_order: WorkOrder.t() | Ecto.Association.NotLoaded.t() } @@ -48,7 +46,6 @@ defmodule Lightning.Attempt do @foreign_key_type :binary_id schema "attempts" do belongs_to :work_order, WorkOrder - belongs_to :reason, InvocationReason belongs_to :starting_job, Job belongs_to :starting_trigger, Trigger @@ -117,9 +114,9 @@ defmodule Lightning.Attempt do @doc false def changeset(attempt, attrs) do attempt - |> cast(attrs, [:reason_id, :work_order_id, :priority]) + |> cast(attrs, [:work_order_id, :priority]) |> cast_assoc(:runs, required: false) - |> validate_required([:reason_id, :work_order_id]) + |> validate_required([:work_order_id]) |> assoc_constraint(:work_order) |> validate() end diff --git a/lib/lightning/projects.ex b/lib/lightning/projects.ex index 688321010f..90b90185ff 100644 --- a/lib/lightning/projects.ex +++ b/lib/lightning/projects.ex @@ -19,7 +19,6 @@ defmodule Lightning.Projects do alias Lightning.Accounts.User alias Lightning.ExportUtils alias Lightning.Workflows.Workflow - alias Lightning.InvocationReason alias Lightning.Invocation.{Run, Dataclip} alias Lightning.WorkOrder @@ -204,14 +203,10 @@ defmodule Lightning.Projects do project_workorders_query(project) |> Repo.delete_all() - project_run_invocation_reasons(project) |> Repo.delete_all() - project_runs_query(project) |> Repo.delete_all() project_jobs_query(project) |> Repo.delete_all() - project_trigger_invocation_reason(project) |> Repo.delete_all() - project_triggers_query(project) |> Repo.delete_all() project_workflows_query(project) |> Repo.delete_all() @@ -220,8 +215,6 @@ defmodule Lightning.Projects do project_credentials_query(project) |> Repo.delete_all() - project_dataclip_invocation_reason(project) |> Repo.delete_all() - project_dataclips_query(project) |> Repo.delete_all() {:ok, project} = Repo.delete(project) @@ -236,30 +229,6 @@ defmodule Lightning.Projects do end) end - def project_trigger_invocation_reason(project) do - from(ir in InvocationReason, - join: tr in assoc(ir, :trigger), - join: w in assoc(tr, :workflow), - where: w.project_id == ^project.id - ) - end - - def project_dataclip_invocation_reason(project) do - from(ir in InvocationReason, - join: d in assoc(ir, :dataclip), - where: d.project_id == ^project.id - ) - end - - def project_run_invocation_reasons(project) do - from(ir in InvocationReason, - join: r in assoc(ir, :run), - join: j in assoc(r, :job), - join: w in assoc(j, :workflow), - where: w.project_id == ^project.id - ) - end - def project_attempts_query(project) do from(att in Attempt, join: wo in assoc(att, :work_order), diff --git a/lib/lightning/setup_utils.ex b/lib/lightning/setup_utils.ex index ee8f603f6c..f85448ef6b 100644 --- a/lib/lightning/setup_utils.ex +++ b/lib/lightning/setup_utils.ex @@ -892,7 +892,6 @@ defmodule Lightning.SetupUtils do Lightning.Auditing.Model, Lightning.Projects.ProjectCredential, Lightning.WorkOrder, - Lightning.InvocationReason, Lightning.Invocation.Run, Lightning.Credentials.Credential, Lightning.Workflows.Job, diff --git a/lib/lightning/workorders/workorder.ex b/lib/lightning/workorders/workorder.ex index cae1e3daaa..079dfe3d5f 100644 --- a/lib/lightning/workorders/workorder.ex +++ b/lib/lightning/workorders/workorder.ex @@ -7,7 +7,7 @@ defmodule Lightning.WorkOrder do import Ecto.Changeset alias Lightning.Workflows.{Workflow, Trigger} alias Lightning.Invocation.Dataclip - alias Lightning.{InvocationReason, Attempt} + alias Lightning.{Attempt} require Attempt @@ -16,8 +16,7 @@ defmodule Lightning.WorkOrder do id: Ecto.UUID.t() | nil, trigger: Trigger.t() | Ecto.Association.NotLoaded.t(), dataclip: Dataclip.t() | Ecto.Association.NotLoaded.t(), - workflow: Workflow.t() | Ecto.Association.NotLoaded.t(), - reason: InvocationReason.t() | Ecto.Association.NotLoaded.t() + workflow: Workflow.t() | Ecto.Association.NotLoaded.t() } @primary_key {:id, :binary_id, autogenerate: true} @@ -42,7 +41,6 @@ defmodule Lightning.WorkOrder do belongs_to :trigger, Trigger belongs_to :dataclip, Dataclip - belongs_to :reason, InvocationReason has_many :attempts, Attempt, preload_order: [desc: :inserted_at] has_many :jobs, through: [:workflow, :jobs] @@ -57,8 +55,8 @@ defmodule Lightning.WorkOrder do @doc false def changeset(attempt, attrs) do attempt - |> cast(attrs, [:state, :last_activity, :reason_id, :workflow_id]) - |> validate_required([:state, :last_activity, :reason_id, :workflow_id]) + |> cast(attrs, [:state, :last_activity, :workflow_id]) + |> validate_required([:state, :last_activity, :workflow_id]) |> validate() end diff --git a/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs b/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs new file mode 100644 index 0000000000..b2dab2009a --- /dev/null +++ b/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs @@ -0,0 +1,28 @@ +defmodule Lightning.Repo.Migrations.RemoveInvocationReasonsAndConstraints do + use Ecto.Migration + + def up do + alter table(:work_orders) do + remove :reason_id + end + + alter table(:attempts) do + remove :reason_id + end + + drop table(:invocation_reasons) + end + + def down do + create table(:invocation_reasons) do + end + + alter table(:work_orders) do + add :reason_id, references(:invocation_reasons, type: :binary_id), null: false + end + + alter table(:attempts) do + add :reason_id, references(:invocation_reasons, type: :binary_id), null: false + end + end +end diff --git a/test/lightning/accounts_test.exs b/test/lightning/accounts_test.exs index 01281d7a05..ac12f0c695 100644 --- a/test/lightning/accounts_test.exs +++ b/test/lightning/accounts_test.exs @@ -2,7 +2,6 @@ defmodule Lightning.AccountsTest do use Lightning.DataCase, async: true alias Lightning.AccountsFixtures - alias Lightning.InvocationFixtures alias Lightning.Credentials alias Lightning.Jobs alias Lightning.JobsFixtures @@ -16,11 +15,28 @@ defmodule Lightning.AccountsTest do import Lightning.Factories import Ecto.Query - test "has_activity_in_projects?/1 returns true if user is has activity in a project (is associated to invocation reasons) and false otherwise." do + test "has_activity_in_projects?/1 returns true if user has activity in a project (is associated with an attempt) and false otherwise." do user = AccountsFixtures.user_fixture() another_user = AccountsFixtures.user_fixture() - InvocationFixtures.reason_fixture(user_id: user.id) + workflow = insert(:workflow) + trigger = insert(:trigger, workflow: workflow) + dataclip = insert(:dataclip) + + work_order = + insert(:workorder, + workflow: workflow, + trigger: trigger, + dataclip: dataclip + ) + + _attempt = + insert(:attempt, + created_by: user, + work_order: work_order, + starting_trigger: trigger, + dataclip: dataclip + ) assert Accounts.has_activity_in_projects?(user) refute Accounts.has_activity_in_projects?(another_user) @@ -586,23 +602,38 @@ defmodule Lightning.AccountsTest do end describe "The default Oban function Accounts.perform/1" do - test "prevents users that are still linked to an invocation reason from being deleted" do + test "prevents users that are still linked to an attempt from being deleted" do user = user_fixture( scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: -10) ) - InvocationFixtures.reason_fixture(user_id: user.id) + workflow = insert(:workflow) + trigger = insert(:trigger, workflow: workflow) + dataclip = insert(:dataclip) - assert 1 = Repo.all(User) |> Enum.count() + work_order = + insert(:workorder, + workflow: workflow, + trigger: trigger, + dataclip: dataclip + ) + + _attempt = + insert(:attempt, + created_by: user, + work_order: work_order, + starting_trigger: trigger, + dataclip: dataclip + ) + + assert 1 == Repo.all(User) |> Enum.count() {:ok, %{users_deleted: users_deleted}} = Accounts.perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) - # We still have one user in database assert 1 == Repo.all(User) |> Enum.count() - # No user has been deleted assert 0 == users_deleted |> Enum.count() end diff --git a/test/lightning/attempt_service_test.exs b/test/lightning/attempt_service_test.exs index 221e5d03b6..9bcac9ede2 100644 --- a/test/lightning/attempt_service_test.exs +++ b/test/lightning/attempt_service_test.exs @@ -11,25 +11,23 @@ defmodule Lightning.AttemptServiceTest do describe "attempts" do @tag skip: "Replaced by Attempts.enqueue/1" test "create_attempt/3 returns a new Attempt, with a new Run" do - %{job: job, trigger: trigger} = workflow_job_fixture() + %{job: job, trigger: _trigger} = workflow_job_fixture() work_order = work_order_fixture(workflow_id: job.workflow_id) - reason = reason_fixture(trigger_id: trigger.id) + dataclip = dataclip_fixture() job_id = job.id work_order_id = work_order.id - reason_id = reason.id - data_clip_id = reason.dataclip_id + data_clip_id = dataclip.id assert {:ok, %Attempt{ work_order_id: ^work_order_id, - reason_id: ^reason_id, runs: [%Run{job_id: ^job_id, input_dataclip_id: ^data_clip_id}] }} = AttemptService.create_attempt( work_order, job, - reason + dataclip ) end end @@ -41,7 +39,7 @@ defmodule Lightning.AttemptServiceTest do work_order = work_order_fixture(workflow_id: job.workflow_id) dataclip = dataclip_fixture() - reason = + _reason = reason_fixture( trigger_id: trigger.id, dataclip_id: dataclip.id @@ -49,8 +47,7 @@ defmodule Lightning.AttemptServiceTest do attempt = %Attempt{ - work_order_id: work_order.id, - reason_id: reason.id + work_order_id: work_order.id } |> Repo.insert!() diff --git a/test/lightning/invocation/attempt_test.exs b/test/lightning/invocation/attempt_test.exs index 961241c326..245dd4cf7f 100644 --- a/test/lightning/invocation/attempt_test.exs +++ b/test/lightning/invocation/attempt_test.exs @@ -4,12 +4,6 @@ defmodule Lightning.Invocation.AttemptTest do alias Lightning.Attempt describe "changeset/2" do - test "must have a reason" do - errors = Attempt.changeset(%Attempt{}, %{}) |> errors_on() - - assert errors[:reason_id] == ["can't be blank"] - end - test "must have a work_order" do errors = Attempt.changeset(%Attempt{}, %{}) |> errors_on() diff --git a/test/lightning/invocation/workorder_test.exs b/test/lightning/invocation/workorder_test.exs index 20b50d6ffb..83009609af 100644 --- a/test/lightning/invocation/workorder_test.exs +++ b/test/lightning/invocation/workorder_test.exs @@ -9,11 +9,5 @@ defmodule Lightning.Invocation.WorkOrderTest do assert errors[:workflow_id] == ["can't be blank"] end - - test "must have a reason" do - errors = WorkOrder.changeset(%WorkOrder{}, %{}) |> errors_on() - - assert errors[:reason_id] == ["can't be blank"] - end end end diff --git a/test/lightning/reasons_test.exs b/test/lightning/reasons_test.exs deleted file mode 100644 index 4205d71d05..0000000000 --- a/test/lightning/reasons_test.exs +++ /dev/null @@ -1,69 +0,0 @@ -defmodule Lightning.InvocationReasonsTest do - use Lightning.DataCase, async: true - - import Lightning.InvocationFixtures - import Lightning.AccountsFixtures - import Lightning.Factories - - alias Lightning.InvocationReasons - alias Lightning.InvocationReason - - describe "create_reason/1" do - test "with invalid data returns error changeset" do - assert {:error, %Ecto.Changeset{}} = - InvocationReasons.create_reason(%{type: nil}) - end - - test "with valid data creates a reason" do - trigger = insert(:trigger, %{type: :webhook}) - - valid_attrs = %{ - type: :webhook, - user_id: user_fixture().id, - run_id: run_fixture().id, - dataclip_id: dataclip_fixture().id, - trigger_id: trigger.id - } - - assert {:ok, %InvocationReason{}} = - InvocationReasons.create_reason(valid_attrs) - end - end - - describe "build/2" do - test "with trigger of type :webhook or :cron returns a valid reason" do - trigger = insert(:trigger, %{type: :webhook}) - cron_trigger = insert(:trigger, %{type: :cron}) - dataclip = dataclip_fixture() - - assert %Ecto.Changeset{valid?: true} = - InvocationReasons.build( - trigger, - dataclip - ) - - assert %Ecto.Changeset{valid?: true} = - InvocationReasons.build( - cron_trigger, - dataclip - ) - end - - test "with :manual" do - dataclip = dataclip_fixture() - - assert %Ecto.Changeset{valid?: true} = - InvocationReasons.build(:manual, %{ - user: user_fixture(), - dataclip: dataclip - }) - end - - test "with :retry" do - run = run_fixture() - - assert %Ecto.Changeset{valid?: true} = - InvocationReasons.build(:retry, %{user: user_fixture(), run: run}) - end - end -end diff --git a/test/lightning_web/live/user_live_test.exs b/test/lightning_web/live/user_live_test.exs index aee91117c1..468518b096 100644 --- a/test/lightning_web/live/user_live_test.exs +++ b/test/lightning_web/live/user_live_test.exs @@ -1,11 +1,11 @@ defmodule LightningWeb.UserLiveTest do alias Lightning.AccountsFixtures - alias Lightning.InvocationFixtures use LightningWeb.ConnCase, async: true import Lightning.{AccountsFixtures} import Phoenix.LiveViewTest import Swoosh.TestAssertions + import Lightning.Factories @create_attrs %{ email: "test@example.com", @@ -240,7 +240,23 @@ defmodule LightningWeb.UserLiveTest do scheduled_deletion: Timex.now() |> Timex.shift(days: 7) ) - InvocationFixtures.reason_fixture(user_id: user.id) + workflow = insert(:workflow) + trigger = insert(:trigger, workflow: workflow) + dataclip = insert(:dataclip) + + work_order = + insert(:workorder, + workflow: workflow, + trigger: trigger, + dataclip: dataclip + ) + + insert(:attempt, + created_by: user, + work_order: work_order, + starting_trigger: trigger, + dataclip: dataclip + ) {:ok, index_live, _html} = live(conn, Routes.user_index_path(conn, :index)) From bf6b246e85546952e66364585b037299d1aa7ad0 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Wed, 22 Nov 2023 15:44:01 +0200 Subject: [PATCH 2/2] Add original invocation_reasons DDL to migration Touch up tests and remove (with their tests): - Lightning.AttemptService - Lightning.Workflows.Graph (replaced by Lightning.Graph) Closes: #1412 --- lib/lightning/attempt_service.ex | 372 ---------------- lib/lightning/workflows/graph.ex | 87 ---- ...ove_invocation_reasons_and_constraints.exs | 59 ++- test/lightning/accounts_test.exs | 30 +- test/lightning/attempt_service_test.exs | 397 ------------------ test/lightning/workflows/graph_test.exs | 50 --- test/support/model_helpers.ex | 6 + 7 files changed, 74 insertions(+), 927 deletions(-) delete mode 100644 lib/lightning/attempt_service.ex delete mode 100644 lib/lightning/workflows/graph.ex delete mode 100644 test/lightning/attempt_service_test.exs delete mode 100644 test/lightning/workflows/graph_test.exs diff --git a/lib/lightning/attempt_service.ex b/lib/lightning/attempt_service.ex deleted file mode 100644 index f3919cb8f5..0000000000 --- a/lib/lightning/attempt_service.ex +++ /dev/null @@ -1,372 +0,0 @@ -defmodule Lightning.AttemptService do - @moduledoc """ - The Attempts context. - """ - - import Ecto.Query, warn: false - alias Ecto.Multi - alias Lightning.Repo - alias Lightning.{Attempt, AttemptRun} - alias Lightning.Invocation.{Run} - alias Lightning.InvocationReason - - @doc """ - Create an attempt - - ## Examples - - iex> create_attempt(%{field: value}) - {:ok, %Attempt{}} - - iex> create_attempt(%{field: bad_value}) - {:error, %Ecto.Changeset{}} - - """ - def create_attempt(work_order, job, reason) do - project_id = job.workflow.project_id - dataclip_id = reason.dataclip_id - - build_attempt(work_order, reason) - |> Ecto.Changeset.put_assoc(:runs, [ - Run.changeset(%Run{}, %{ - project_id: project_id, - job_id: job.id, - input_dataclip_id: dataclip_id - }) - ]) - |> Repo.insert() - end - - def build_attempt(work_order, reason) do - Ecto.build_assoc(work_order, :attempts) - |> Ecto.Changeset.change(%{reason: reason}) - end - - @doc """ - Adds an Attempt to an unsaved Run - - When given an Attempt, it simply adds the Run to a new AttemptRun. - However when given an AttemptRun, the Run (from the AttemptRun) is - set as the previous Run for the new unsaved Run. - """ - @spec append(Attempt.t() | AttemptRun.t(), Ecto.Changeset.t(Run.t())) :: - {:ok, AttemptRun.t()} | {:error, Ecto.Changeset.t(AttemptRun.t())} - def append(%Attempt{} = attempt, %Ecto.Changeset{} = run) do - AttemptRun.new() - |> Ecto.Changeset.put_assoc(:attempt, attempt) - |> Ecto.Changeset.put_assoc(:run, run) - |> Repo.insert() - end - - def append(%AttemptRun{} = attempt_run, %Ecto.Changeset{} = run) do - AttemptRun.new(%{attempt_id: attempt_run.attempt_id}) - |> Ecto.Changeset.put_assoc( - :run, - run |> Ecto.Changeset.put_change(:previous_id, attempt_run.run_id) - ) - |> Repo.insert() - end - - @doc """ - Creates a new Attempt starting from a given run. - - All upstream/prior Runs that were performed on that attempt are associated - with the new Attempt, where as the specified run is used to create a new one - and is added to the Attempt. - - Any runs downstream from the Run given are ignored. - """ - @spec retry( - Attempt.t(), - Run.t(), - Ecto.Changeset.t(Lightning.InvocationReason.t()) - | Lightning.InvocationReason.t() - ) :: Ecto.Multi.t() - def retry(%Attempt{} = attempt, %Run{} = run, reason) do - attempt = Repo.preload(attempt, :work_order) - - # no way we don't have a workflow , throw if we dont have one - attempt_workflow = get_workflow_for(attempt) |> Repo.one!() - - existing_runs = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt.id - ) - |> Repo.all() - - {skipped_runs, new_run} = - calculate_runs(attempt_workflow, existing_runs, run) - - Multi.new() - |> Multi.insert(:attempt, fn _ -> - build_attempt(attempt.work_order, reason) - |> Ecto.Changeset.put_assoc( - :runs, - skipped_runs - ) - end) - |> Multi.insert(:attempt_run, fn %{attempt: attempt} -> - AttemptRun.new() - |> Ecto.Changeset.put_assoc(:attempt, attempt) - |> Ecto.Changeset.put_assoc(:run, new_run) - end) - end - - @doc """ - Creates new Attempts for each pair of corresponding AttemptRun and - InvocationReason. - """ - @spec retry_many([AttemptRun.t()], [Lightning.InvocationReason.t()]) :: - Ecto.Multi.t() - def retry_many( - [%AttemptRun{} | _other_runs] = attempt_runs, - [%InvocationReason{} | _other_reasons] = reasons - ) do - attempt_runs = - attempt_runs - |> Repo.preload([ - :run, - attempt: [work_order: [workflow: [:jobs, :edges]], runs: []] - ]) - - Multi.new() - |> Multi.insert_all( - :attempts, - Attempt, - fn _ -> - reasons_map = Map.new(reasons, &{&1.run_id, &1.id}) - now = DateTime.utc_now() - - Enum.map(attempt_runs, fn %{attempt: %{work_order: work_order}, run: run} -> - %{ - work_order_id: work_order.id, - reason_id: Map.fetch!(reasons_map, run.id), - inserted_at: now - } - end) - end, - returning: true - ) - |> Multi.run(:attempt_runs_setup, fn _repo, - %{attempts: {_count, attempts}} -> - attempts_map = Map.new(attempts, &{&1.work_order_id, &1.id}) - now = DateTime.utc_now() - - setup = - attempt_runs - |> Enum.map(fn %{attempt: attempt, run: run} -> - {skipped_runs, new_run} = - calculate_runs(attempt.work_order.workflow, attempt.runs, run) - - attempt_id = attempts_map[attempt.work_order.id] - - skipped_attempt_runs = - attempt_runs_attrs(attempt_id, skipped_runs, now) - - {attempt_id, {skipped_attempt_runs, new_run.changes}} - end) - - {:ok, setup} - end) - |> Multi.insert_all( - :skipped_attempt_runs, - AttemptRun, - fn %{attempt_runs_setup: setup} -> - Enum.flat_map(setup, fn {_, {skipped_runs, _new_run}} -> - skipped_runs - end) - end - ) - |> Multi.insert_all( - :runs, - Run, - fn %{attempt_runs_setup: setup} -> - now = DateTime.utc_now() - - Enum.map(setup, fn {_, {_skipped_runs, new_run}} -> - Map.merge(new_run, %{inserted_at: now, updated_at: now}) - end) - end, - returning: true - ) - |> Multi.insert_all( - :attempt_runs, - AttemptRun, - fn %{attempt_runs_setup: setup} -> - now = DateTime.utc_now() - - Enum.flat_map(setup, fn {attempt_id, {_skipped_runs, new_run}} -> - attempt_runs_attrs(attempt_id, [new_run], now) - end) - end, - returning: true - ) - end - - def retry_many([], []) do - Multi.new() - end - - defp attempt_runs_attrs(attempt_id, runs, timestamp) do - Enum.map(runs, fn run -> - %{ - attempt_id: attempt_id, - run_id: run.id, - inserted_at: timestamp - } - end) - end - - def get_workflow_for(%Attempt{work_order: %{workflow_id: wid}}) do - from(w in Lightning.Workflows.Workflow, - where: w.id == ^wid, - preload: [:jobs, edges: [:target_job, :source_job]] - ) - end - - def calculate_runs(workflow, existing_runs, starting_run) do - # TODO sanity check that ALL existing runs have a place in the graph - - runs_by_job_id = - existing_runs - |> Enum.into(%{}, fn %Run{job_id: job_id} = run -> {job_id, run} end) - - graph = - Lightning.Workflows.Graph.new(workflow) - |> Lightning.Workflows.Graph.remove(starting_run.job_id) - - {graph.jobs - |> Enum.map(fn %{id: id} -> runs_by_job_id[id] end) - |> Enum.reject(&is_nil/1), Run.new_from(starting_run)} - end - - def get_for_rerun(attempt_id, run_id) do - from(ar in AttemptRun, - where: ar.attempt_id == ^attempt_id and ar.run_id == ^run_id, - preload: [ - :attempt, - run: - ^from(r in Run, - select: [ - :id, - :job_id, - :started_at, - :finished_at, - :input_dataclip_id, - :output_dataclip_id - ] - ) - ] - ) - |> Repo.one() - end - - @doc """ - Returns a list of AttemptRun structs that should be rerun for the given list - of work order ids. - """ - @spec list_for_rerun_from_start([Ecto.UUID.t()]) :: [AttemptRun.t()] - def list_for_rerun_from_start(order_ids) when is_list(order_ids) do - attempt_run_numbers_query = - from(ar in AttemptRun, - join: att in assoc(ar, :attempt), - join: r in assoc(ar, :run), - where: att.work_order_id in ^order_ids, - select: %{ - id: ar.id, - row_num: - row_number() - |> over( - partition_by: att.work_order_id, - order_by: coalesce(r.started_at, r.inserted_at) - ) - } - ) - - first_attempt_runs_query = - from(ar in AttemptRun, - join: arn in subquery(attempt_run_numbers_query), - on: ar.id == arn.id, - join: att in assoc(ar, :attempt), - join: wo in assoc(att, :work_order), - where: arn.row_num == 1, - order_by: [asc: wo.inserted_at], - preload: [ - :attempt, - run: - ^from(r in Run, - select: [ - :id, - :job_id, - :started_at, - :finished_at, - :input_dataclip_id, - :output_dataclip_id - ] - ) - ] - ) - - Repo.all(first_attempt_runs_query) - end - - @doc """ - Returns a list of AttemptRun structs that should be rerun for the given list - of workorder ids that are associated to the given Job - """ - @spec list_for_rerun_from_job( - workorders :: [Ecto.UUID.t(), ...], - job_id :: Ecto.UUID.t() - ) :: [ - AttemptRun.t(), - ... - ] - def list_for_rerun_from_job(order_ids, job_id) - when is_list(order_ids) do - last_attempts_query = - from(att in Lightning.Attempt, - join: r in assoc(att, :runs), - where: att.work_order_id in ^order_ids, - group_by: att.work_order_id, - select: %{ - work_order_id: att.work_order_id, - last_inserted_at: max(att.inserted_at) - } - ) - - attempt_runs_query = - from(ar in AttemptRun, - join: att in assoc(ar, :attempt), - join: wo in assoc(att, :work_order), - join: last in subquery(last_attempts_query), - on: - last.work_order_id == att.work_order_id and - att.inserted_at == last.last_inserted_at, - join: r in assoc(ar, :run), - on: r.job_id == ^job_id, - order_by: [asc: wo.inserted_at], - preload: [ - attempt: att, - run: r - ] - ) - - Repo.all(attempt_runs_query) - end - - @doc """ - Get the latest attempt associated to a given run - """ - - def get_last_attempt_for(%Run{id: id}) do - from(a in Attempt, - join: r in assoc(a, :runs), - where: r.id == ^id, - order_by: [desc: a.inserted_at], - limit: 1, - preload: [work_order: :workflow] - ) - |> Repo.one() - end -end diff --git a/lib/lightning/workflows/graph.ex b/lib/lightning/workflows/graph.ex deleted file mode 100644 index 9d89383090..0000000000 --- a/lib/lightning/workflows/graph.ex +++ /dev/null @@ -1,87 +0,0 @@ -defmodule Lightning.Workflows.Graph do - @moduledoc """ - Utility to construct and manipulate a graph/plan made out of Jobs - """ - alias Lightning.Workflows.Workflow - defstruct [:digraph, :root, :jobs] - - @type vertex :: {Ecto.UUID.t()} - - @type t :: %__MODULE__{ - digraph: :digraph.graph(), - root: vertex(), - jobs: [Lightning.Workflows.Job.t()] - } - - @spec new(workflow :: Workflow.t()) :: __MODULE__.t() - def new(%Workflow{} = workflow) do - g = :digraph.new() - - for j <- workflow.jobs do - :digraph.add_vertex(g, to_vertex(j)) - end - - for e <- workflow.edges do - if e.condition in [:on_job_failure, :on_job_success] do - :digraph.add_edge( - g, - to_vertex(%{id: e.source_job_id}), - to_vertex(%{id: e.target_job_id}) - ) - end - end - - root = - if workflow.edges == [] do - nil - else - get_root(g) - end - - %__MODULE__{digraph: g, root: root, jobs: workflow.jobs} - end - - @spec remove(__MODULE__.t(), Ecto.UUID.t()) :: __MODULE__.t() - def remove(%__MODULE__{digraph: g} = graph, job_id) do - vertex = - :digraph.vertices(g) - |> Enum.find(fn {id} -> id == job_id end) - - :digraph.del_vertex(g, vertex) - prune(graph) - - %{graph | jobs: vertices(graph)} - end - - @spec vertices(__MODULE__.t()) :: [Lightning.Workflows.Job.t()] - def vertices(%__MODULE__{digraph: g, jobs: jobs}) do - :digraph_utils.topsort(g) - |> Enum.map(fn {id} -> - Enum.find(jobs, &match?(%{id: ^id}, &1)) - end) - end - - defp get_root(g) do - {:yes, root} = :digraph_utils.arborescence_root(g) - root - end - - defp get_reachable(%__MODULE__{} = graph) do - [graph.root] ++ - :digraph_utils.reachable_neighbours([graph.root], graph.digraph) - end - - defp prune(%__MODULE__{} = graph) do - reachable = get_reachable(graph) - - unreachable = - :digraph.vertices(graph.digraph) - |> Enum.filter(fn v -> v not in reachable end) - - true = :digraph.del_vertices(graph.digraph, unreachable) - end - - defp to_vertex(%{id: id}) do - {id} - end -end diff --git a/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs b/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs index b2dab2009a..21d7fea287 100644 --- a/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs +++ b/priv/repo/migrations/20231120112129_remove_invocation_reasons_and_constraints.exs @@ -14,8 +14,63 @@ defmodule Lightning.Repo.Migrations.RemoveInvocationReasonsAndConstraints do end def down do - create table(:invocation_reasons) do - end + execute(""" + CREATE TABLE invocation_reasons ( + id uuid NOT NULL, + type character varying(20) NOT NULL, + trigger_id uuid, + user_id uuid, + run_id uuid, + dataclip_id uuid, + inserted_at timestamp(0) without time zone NOT NULL, + updated_at timestamp(0) without time zone NOT NULL + ); + """) + + execute(""" + ALTER TABLE invocation_reasons OWNER TO postgres; + """) + + execute(""" + ALTER TABLE ONLY invocation_reasons + ADD CONSTRAINT invocation_reasons_pkey PRIMARY KEY (id); + """) + + execute(""" + CREATE INDEX invocation_reasons_dataclip_id_index ON invocation_reasons USING btree (dataclip_id); + """) + + execute(""" + CREATE INDEX invocation_reasons_run_id_index ON invocation_reasons USING btree (run_id); + """) + + execute(""" + CREATE INDEX invocation_reasons_trigger_id_index ON invocation_reasons USING btree (trigger_id); + """) + + execute(""" + CREATE INDEX invocation_reasons_user_id_index ON invocation_reasons USING btree (user_id); + """) + + execute(""" + ALTER TABLE ONLY invocation_reasons + ADD CONSTRAINT invocation_reasons_dataclip_id_fkey FOREIGN KEY (dataclip_id) REFERENCES dataclips(id); + """) + + execute(""" + ALTER TABLE ONLY invocation_reasons + ADD CONSTRAINT invocation_reasons_run_id_fkey FOREIGN KEY (run_id) REFERENCES runs(id); + """) + + execute(""" + ALTER TABLE ONLY invocation_reasons + ADD CONSTRAINT invocation_reasons_trigger_id_fkey FOREIGN KEY (trigger_id) REFERENCES triggers(id); + """) + + execute(""" + ALTER TABLE ONLY invocation_reasons + ADD CONSTRAINT invocation_reasons_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id); + """) alter table(:work_orders) do add :reason_id, references(:invocation_reasons, type: :binary_id), null: false diff --git a/test/lightning/accounts_test.exs b/test/lightning/accounts_test.exs index ac12f0c695..6e9f4dabf0 100644 --- a/test/lightning/accounts_test.exs +++ b/test/lightning/accounts_test.exs @@ -13,7 +13,6 @@ defmodule Lightning.AccountsTest do alias Lightning.Accounts.{User, UserBackupCode, UserToken, UserTOTP} import Lightning.AccountsFixtures import Lightning.Factories - import Ecto.Query test "has_activity_in_projects?/1 returns true if user has activity in a project (is associated with an attempt) and false otherwise." do user = AccountsFixtures.user_fixture() @@ -508,13 +507,13 @@ defmodule Lightning.AccountsTest do project_users: [%{user_id: user_2.id}] }) - assert 2 == Repo.all(ProjectUser) |> Enum.count() - assert 2 == Repo.all(User) |> Enum.count() + assert count_for(ProjectUser) == 2 + assert count_for(User) == 2 :ok = Accounts.purge_user(user_1.id) - assert 1 == Repo.all(ProjectUser) |> Enum.count() - assert 1 == Repo.all(User) |> Enum.count() + assert count_for(ProjectUser) == 1 + assert count_for(User) == 1 remaining_projs = Repo.all(ProjectUser) remaining_users = Repo.all(User) @@ -587,11 +586,11 @@ defmodule Lightning.AccountsTest do CredentialsFixtures.credential_fixture(user_id: user_1.id) CredentialsFixtures.credential_fixture(user_id: user_2.id) - assert 3 == Repo.all(Credentials.Credential) |> Enum.count() + assert count_for(Credentials.Credential) == 3 :ok = Accounts.purge_user(user_1.id) - assert 1 == Repo.all(Credentials.Credential) |> Enum.count() + assert count_for(Credentials.Credential) == 1 refute Repo.all(Credentials.Credential) |> Enum.any?(fn x -> x.user_id == user_1.id end) @@ -627,18 +626,18 @@ defmodule Lightning.AccountsTest do dataclip: dataclip ) - assert 1 == Repo.all(User) |> Enum.count() + assert count_for(User) == 1 {:ok, %{users_deleted: users_deleted}} = Accounts.perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) - assert 1 == Repo.all(User) |> Enum.count() + assert count_for(User) == 1 - assert 0 == users_deleted |> Enum.count() + assert users_deleted |> Enum.count() == 0 end test "removes all users past deletion date when called with type 'purge_deleted'" do - user_to_delete = + %{id: id_of_deleted} = user_fixture( scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: -10) ) @@ -647,15 +646,8 @@ defmodule Lightning.AccountsTest do scheduled_deletion: DateTime.utc_now() |> Timex.shift(seconds: 10) ) - count_before = Repo.all(User) |> Enum.count() - - {:ok, %{users_deleted: users_deleted}} = + {:ok, %{users_deleted: [%{id: ^id_of_deleted}]}} = Accounts.perform(%Oban.Job{args: %{"type" => "purge_deleted"}}) - - assert count_before - 1 == Repo.all(User) |> Enum.count() - assert 1 == users_deleted |> Enum.count() - - assert user_to_delete.id == users_deleted |> Enum.at(0) |> Map.get(:id) end test "removes user from project users before deleting them" do diff --git a/test/lightning/attempt_service_test.exs b/test/lightning/attempt_service_test.exs deleted file mode 100644 index 9bcac9ede2..0000000000 --- a/test/lightning/attempt_service_test.exs +++ /dev/null @@ -1,397 +0,0 @@ -defmodule Lightning.AttemptServiceTest do - use Lightning.DataCase, async: true - - import Lightning.JobsFixtures - import Lightning.InvocationFixtures - alias Lightning.Attempt - alias Lightning.AttemptService - alias Lightning.Invocation.{Run} - import Lightning.Factories - - describe "attempts" do - @tag skip: "Replaced by Attempts.enqueue/1" - test "create_attempt/3 returns a new Attempt, with a new Run" do - %{job: job, trigger: _trigger} = workflow_job_fixture() - work_order = work_order_fixture(workflow_id: job.workflow_id) - dataclip = dataclip_fixture() - - job_id = job.id - work_order_id = work_order.id - data_clip_id = dataclip.id - - assert {:ok, - %Attempt{ - work_order_id: ^work_order_id, - runs: [%Run{job_id: ^job_id, input_dataclip_id: ^data_clip_id}] - }} = - AttemptService.create_attempt( - work_order, - job, - dataclip - ) - end - end - - @tag :skip - describe "append/2" do - test "adds a run to an existing attempt" do - %{job: job, trigger: trigger} = workflow_job_fixture() - work_order = work_order_fixture(workflow_id: job.workflow_id) - dataclip = dataclip_fixture() - - _reason = - reason_fixture( - trigger_id: trigger.id, - dataclip_id: dataclip.id - ) - - attempt = - %Attempt{ - work_order_id: work_order.id - } - |> Repo.insert!() - - new_run = - Run.changeset(%Run{}, %{ - project_id: job.workflow.project_id, - job_id: job.id, - input_dataclip_id: dataclip.id - }) - - {:ok, attempt_run} = AttemptService.append(attempt, new_run) - - assert Ecto.assoc(attempt_run.run, :attempts) |> Repo.all() == [attempt] - end - end - - @tag skip: "Replaced by WorkOrders.retry/3" - describe "retry" do - setup do - workflow_scenario() - end - - test "creates a new attempt starting from an existing run", %{ - jobs: jobs, - workflow: workflow - } do - user = insert(:user) - dataclip = insert(:dataclip) - - work_order = - insert(:workorder, - workflow: workflow, - reason: build(:reason, user: user, type: :manual, dataclip: dataclip) - ) - - # first attempt - attempt_runs = - Enum.map([jobs.a, jobs.b, jobs.c, jobs.e, jobs.f], fn j -> - %{ - job_id: j.id, - input_dataclip_id: dataclip.id - } - end) ++ - [%{job_id: jobs.d.id, input_dataclip_id: dataclip.id}] - - attempt = - insert(:attempt, - work_order: work_order, - runs: attempt_runs, - reason: work_order.reason - ) - - # find the failed run for this attempt - run = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt.id, - where: r.exit_reason != :success - ) - |> Repo.one() - - reason = Lightning.InvocationReasons.build(:retry, %{user: user, run: run}) - - {:ok, %{attempt_run: attempt_run}} = - AttemptService.retry(attempt, run, reason) - |> Repo.transaction() - - refute attempt_run.attempt_id == attempt.id - - original_runs = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt.id, - select: r.id - ) - |> Repo.all() - |> MapSet.new() - - new_runs = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt_run.attempt_id, - select: r.id - ) - |> Repo.all() - |> MapSet.new() - - assert MapSet.intersection(original_runs, new_runs) |> MapSet.size() == 5 - refute MapSet.member?(original_runs, attempt_run.run_id) - assert MapSet.member?(new_runs, attempt_run.run_id) - end - end - - @tag skip: "Replaced by WorkOrders.retry_many/3" - describe "rerun_many/2" do - setup do - workflow_scenario() - end - - test "creates a new attempt starting from an existing run for each attempt run", - %{ - jobs: jobs, - workflow: workflow - } do - work_order = work_order_fixture(workflow_id: workflow.id) - dataclip = dataclip_fixture() - user = insert(:user) - - # first attempt - attempt_runs = - Enum.map([jobs.a, jobs.b, jobs.c, jobs.e, jobs.f], fn j -> - %{ - job_id: j.id, - input_dataclip_id: dataclip.id - } - end) ++ - [%{job_id: jobs.d.id, input_dataclip_id: dataclip.id}] - - attempt = - Lightning.Attempt.new(%{ - work_order_id: work_order.id, - reason_id: work_order.reason_id, - runs: attempt_runs - }) - |> Repo.insert!() - - # find the failed run for this attempt - run = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt.id, - where: r.exit_reason == :failed - ) - |> Repo.one() - - # find the failed attempt run - attempt_run = - Repo.get_by(Lightning.AttemptRun, run_id: run.id, attempt_id: attempt.id) - - reason = - Lightning.InvocationReasons.build(:retry, %{user: user, run: run}) - |> Repo.insert!() - - {:ok, %{attempt_runs: {1, [new_attempt_run]}}} = - AttemptService.retry_many([attempt_run], [reason]) - |> Repo.transaction() - - refute new_attempt_run.attempt_id == attempt.id - - original_runs = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^attempt.id, - select: r.id - ) - |> Repo.all() - |> MapSet.new() - - new_runs = - from(r in Run, - join: a in assoc(r, :attempts), - where: a.id == ^new_attempt_run.attempt_id, - select: r.id - ) - |> Repo.all() - |> MapSet.new() - - assert MapSet.intersection(original_runs, new_runs) |> MapSet.size() == 5 - refute MapSet.member?(original_runs, new_attempt_run.run_id) - assert MapSet.member?(new_runs, new_attempt_run.run_id) - end - end - - describe "list_for_rerun_from_start/1" do - setup do - workflow_scenario() - end - - @tag :skip - test "only the first attempt (oldest) is listed for each work order, ordered - by workorder creation date, oldest to newest", - %{ - jobs: jobs, - workflow: workflow - } do - work_order_1 = work_order_fixture(workflow_id: workflow.id) - work_order_2 = work_order_fixture(workflow_id: workflow.id) - dataclip = dataclip_fixture() - - now = Timex.now() - - [work_order_1, work_order_2] - |> Enum.each(fn work_order -> - runs = - Enum.map( - [ - {jobs.a, -100}, - {jobs.b, -80}, - {jobs.c, -70}, - {jobs.d, -50}, - {jobs.e, -30}, - {jobs.f, -20} - ], - fn {j, time} -> - %{ - job_id: j.id, - input_dataclip_id: dataclip.id, - started_at: Timex.shift(now, microseconds: time), - finished_at: Timex.shift(now, microseconds: time + 5) - } - end - ) - - Lightning.Attempt.new(%{ - work_order_id: work_order.id, - reason_id: work_order.reason_id, - runs: runs - }) - |> Repo.insert!() - end) - - [ar1, ar2] = - AttemptService.list_for_rerun_from_start([ - work_order_1.id, - work_order_2.id - ]) - - assert Enum.all?([ar1, ar2], fn ar -> ar.run.job_id == jobs.a.id end) - - assert [ar1, ar2] - |> Enum.uniq_by(& &1.attempt_id) - |> Enum.count() == 2 - - assert [ar1, ar2] - |> Enum.uniq_by(& &1.attempt.work_order_id) - |> Enum.count() == 2 - - assert work_order_1.inserted_at < work_order_2.inserted_at - assert ar1.attempt.work_order_id == work_order_1.id - assert ar2.attempt.work_order_id == work_order_2.id - end - end - - describe "list_for_rerun_from_job/2" do - setup do - workflow_scenario() - end - - @tag :skip - test "returns the AttemptRuns for the latest Attempt of each work order - associated with the job, ordered by workorder creation date, oldest to - newest", - %{ - jobs: jobs, - workflow: workflow - } do - work_order_1 = work_order_fixture(workflow_id: workflow.id) - work_order_2 = work_order_fixture(workflow_id: workflow.id) - - dataclip = dataclip_fixture() - - # First Attempts with all Jobs - [_attempt_1_work_order_1, attempt_1_work_order_2] = - Enum.map([work_order_1, work_order_2], fn work_order -> - runs = - Enum.map( - Map.values(jobs), - fn j -> - %{ - job_id: j.id, - input_dataclip_id: dataclip.id, - exit_reason: "success" - } - end - ) - - Lightning.Attempt.new(%{ - work_order_id: work_order.id, - reason_id: work_order.reason_id, - runs: runs - }) - |> Repo.insert!() - end) - - # Second Attempt For Work Order 1 - # Job d is missing - dataclip2 = dataclip_fixture() - - runs = - Enum.map([jobs.a, jobs.b, jobs.c, jobs.e, jobs.f], fn j -> - %{ - job_id: j.id, - input_dataclip_id: dataclip2.id - } - end) - - attempt_2_work_order_1 = - Attempt.new(%{ - work_order_id: work_order_1.id, - reason_id: work_order_1.reason_id, - runs: runs - }) - |> Repo.insert!() - - # Only the attempt for work order2 will be listed - assert [attempt_run] = - AttemptService.list_for_rerun_from_job( - [ - work_order_1.id, - work_order_2.id - ], - jobs.d.id - ) - - assert attempt_run.attempt_id == attempt_1_work_order_2.id - - ## create the missing attempt run - attempt_run2 = - Lightning.AttemptRun.new(%{ - attempt_id: attempt_2_work_order_1.id, - run: %{ - job_id: jobs.d.id, - input_dataclip_id: dataclip2.id, - exit_reason: "success" - } - }) - |> Repo.insert!() - - [first_attempt_run_in_list, second_attempt_run_in_list] = - AttemptService.list_for_rerun_from_job( - [ - work_order_1.id, - work_order_2.id - ], - jobs.d.id - ) - - assert work_order_1.inserted_at < work_order_2.inserted_at - - assert first_attempt_run_in_list.attempt.work_order_id == work_order_1.id - assert second_attempt_run_in_list.attempt.work_order_id == work_order_2.id - - assert first_attempt_run_in_list.id == attempt_run2.id - assert second_attempt_run_in_list.id == attempt_run.id - end - end -end diff --git a/test/lightning/workflows/graph_test.exs b/test/lightning/workflows/graph_test.exs deleted file mode 100644 index d856bd087f..0000000000 --- a/test/lightning/workflows/graph_test.exs +++ /dev/null @@ -1,50 +0,0 @@ -defmodule Lightning.Workflows.GraphTest do - use Lightning.DataCase, async: true - - import Lightning.JobsFixtures - - alias Lightning.Workflows.Graph - - describe "new/1" do - setup :workflow_scenario - - test "can create a graph from a workflow with jobs and edges", %{ - workflow: workflow, - jobs: jobs - } do - workflow = Lightning.Repo.preload(workflow, [:jobs, :edges]) - - graph = - Graph.new(workflow) - |> Graph.remove(jobs.e.id) - - remaining_jobs = graph |> Graph.vertices() |> Enum.map(& &1.name) - - for j <- ["job_a", "job_b", "job_c", "job_d"] do - assert j in remaining_jobs - end - - for j <- ["job_e", "job_f", "job_g"] do - refute j in remaining_jobs - end - - graph = - Graph.new(workflow) - |> Graph.remove(jobs.d.id) - - remaining_jobs = graph |> Graph.vertices() |> Enum.map(& &1.name) - - for j <- ["job_a", "job_b", "job_c", "job_e", "job_f"] do - assert j in remaining_jobs - end - - refute jobs.d.id in remaining_jobs - - graph = - Graph.new(workflow) - |> Graph.remove(jobs.a.id) - - assert graph.jobs == [] - end - end -end diff --git a/test/support/model_helpers.ex b/test/support/model_helpers.ex index 6e78f8af07..f9508c7cc5 100644 --- a/test/support/model_helpers.ex +++ b/test/support/model_helpers.ex @@ -44,4 +44,10 @@ defmodule Lightning.ModelHelpers do |> Enum.map(fn field -> {field, model |> Map.get(field)} end) |> Enum.into(%{}) end + + def count_for(query) do + import Ecto.Query + + select(query, count()) |> Lightning.Repo.one!() + end end