Skip to content

Commit

Permalink
ReplicationConnection: allow callback to trigger disconnect (#673)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaemonSnake authored May 3, 2024
1 parent 3af848b commit ef6bd2f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
9 changes: 9 additions & 0 deletions lib/postgrex/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ defmodule Postgrex.ReplicationConnection do
@type state :: term
@type ack :: iodata
@type query :: iodata
@type reason :: String.t()

@typedoc """
The following options configure streaming:
Expand Down Expand Up @@ -192,6 +193,7 @@ defmodule Postgrex.ReplicationConnection do
| {:noreply, ack, state}
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason}

@doc """
Invoked after disconnecting.
Expand Down Expand Up @@ -219,6 +221,7 @@ defmodule Postgrex.ReplicationConnection do
| {:noreply, ack, state}
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason}

@doc """
Callback for `Kernel.send/2`.
Expand All @@ -228,6 +231,7 @@ defmodule Postgrex.ReplicationConnection do
| {:noreply, ack, state}
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason}

@doc """
Callback for `call/3`.
Expand All @@ -246,6 +250,7 @@ defmodule Postgrex.ReplicationConnection do
| {:noreply, ack, state}
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason}

@doc """
Callback for `:query` outputs.
Expand All @@ -264,6 +269,7 @@ defmodule Postgrex.ReplicationConnection do
| {:noreply, ack, state}
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason}

@optional_callbacks handle_call: 3,
handle_connect: 1,
Expand Down Expand Up @@ -582,6 +588,9 @@ defmodule Postgrex.ReplicationConnection do

{:query, _query, mod_state} ->
stream_in_progress(:query, mod, mod_state, from, s)

{:disconnect, reason} ->
reconnect_or_stop(:disconnect, reason, s.protocol, s)
end
end

Expand Down
21 changes: 21 additions & 0 deletions test/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ defmodule ReplicationTest do
{:query, query, {from, pid}}
end

@impl true
def handle_call({:disconnect, reason}, _, _) do
{:disconnect, reason}
end

@impl true
def handle_result(results, {from, pid}) when is_list(results) do
Postgrex.ReplicationConnection.reply(from, {:ok, results})
Expand Down Expand Up @@ -140,6 +145,22 @@ defmodule ReplicationTest do
assert_received {:disconnect, i2} when i1 < i2
refute_received {:connect, _}
end

test "trigger disconnect from callback", context do
assert_received {:connect, i1}

Process.flag(:trap_exit, true)

{_pid, ref} =
spawn_monitor(fn -> PR.call(context.repl, {:disconnect, "manual disconnect"}) end)

assert_receive {:DOWN, ^ref, _, _, {"manual disconnect", _}}

ref = Process.monitor(context.repl)
assert_receive {:DOWN, ^ref, _, _, _}
assert_received {:disconnect, i2} when i1 < i2
refute_received {:connect, _}
end
end

describe "auto-reconnect" do
Expand Down

0 comments on commit ef6bd2f

Please sign in to comment.