Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPLICATION: Allow starting one stream right after another #693

Merged

Conversation

zenchild
Copy link
Contributor

This change is in relation to Issue #692. It now allows a new stream to be created right after a COPY command. The test took a little bit of configuring to get the chain of events to fire off in the expected way. If there is a better way to handle this, any candid feedback is welcome.

Example Repl Module:

defmodule MyApp.Repl do
  use Postgrex.ReplicationConnection

  def start_link(opts) do
    # Automatically reconnect if we lose connection.
    extra_opts = [
      auto_reconnect: true
    ]

    Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
  end

  @impl true
  def init(:ok) do
    {:ok, %{step: :disconnected, transaction: nil}}
  end

  @impl true
  def handle_connect(state) do
    query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"

    {:query, query, %{state | step: :start_transaction}}
  end

  @impl true
  def handle_result([%{command: :begin} | _] = _, %{step: :start_transaction} = state) do
    query = """
    CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput USE_SNAPSHOT;
    """

    {:query, query, %{state | step: :create_slot}}
  end

  @impl true
  def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
    query = """
    COPY contacts TO STDOUT;
    COMMIT;
    """

    {:stream, query, [], %{state | step: :copy_table}}
  end

  @impl true
  def handle_data(:done, %{step: :copy_table} = state) do
    query = "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'events')"

    {:stream, query, [], %{state | step: :streaming}}
  end

  @impl true
  def handle_data(results, %{step: :copy_table} = state) do
    # TODO handle COPY TABLE results
    {:noreply, state}
  end

  @impl true
  # https://www.postgresql.org/docs/14/protocol-replication.html
  def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
    {:noreply, state}
  end

  def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
    messages =
      case reply do
        1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
        0 -> []
      end

    {:noreply, messages, state}
  end

  def handle_data(<<?r, _wal_end::64, _clock::64, _reply::64, _timestamp::64, _lts::64>>, state) do
    {:noreply, state}
  end

  @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
  defp current_time(), do: System.os_time(:microsecond) - @epoch
end

@josevalim josevalim merged commit b4b3211 into elixir-ecto:master Jul 18, 2024
8 of 9 checks passed
@josevalim
Copy link
Member

💚 💙 💜 💛 ❤️

@zenchild
Copy link
Contributor Author

Thanks for your guidance @josevalim! Much appreciated.

Loving Elixir BTW ❤️. I'm a long-time Rubyist and I feel right at home.

@zenchild zenchild deleted the task/repl-close-stream-after-copy branch July 19, 2024 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants