Skip to content

Commit

Permalink
close unnamed portal after each executed extended query (#2081)
Browse files Browse the repository at this point in the history
This allows to free server resources earlier and run the same logic as simple Query does:

"The simple Query message is approximately equivalent to the series Parse, Bind, portal Describe, Execute, Close, Sync,"
  • Loading branch information
DXist authored Sep 3, 2022
1 parent 20877d8 commit 04884f1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
28 changes: 24 additions & 4 deletions sqlx-core/src/postgres/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ impl PgConnection {
message if message.format == MessageFormat::PortalSuspended => {
// there was an open portal
// this can happen if the last time a statement was used it was not fully executed
// such as in [fetch_one]
}

message if message.format == MessageFormat::CloseComplete => {
Expand Down Expand Up @@ -219,8 +218,7 @@ impl PgConnection {
// patch holes created during encoding
arguments.apply_patches(self, &metadata.parameters).await?;

// apply patches use fetch_optional thaht may produce `PortalSuspended` message,
// consume messages til `ReadyForQuery` before bind and execute
// consume messages till `ReadyForQuery` before bind and execute
self.wait_until_ready().await?;

// bind to attach the arguments to the statement and create a portal
Expand All @@ -239,6 +237,16 @@ impl PgConnection {
portal: None,
limit: limit.into(),
});
// From https://www.postgresql.org/docs/current/protocol-flow.html:
//
// "An unnamed portal is destroyed at the end of the transaction, or as
// soon as the next Bind statement specifying the unnamed portal as
// destination is issued. (Note that a simple Query message also
// destroys the unnamed portal."

// we ask the database server to close the unnamed portal and free the associated resources
// earlier - after the execution of the current query.
self.stream.write(message::Close::Portal(None));

// finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
Expand Down Expand Up @@ -271,10 +279,17 @@ impl PgConnection {
MessageFormat::BindComplete
| MessageFormat::ParseComplete
| MessageFormat::ParameterDescription
| MessageFormat::NoData => {
| MessageFormat::NoData
// unnamed portal has been closed
| MessageFormat::CloseComplete
=> {
// harmless messages to ignore
}

// "Execute phase is always terminated by the appearance of
// exactly one of these messages: CommandComplete,
// EmptyQueryResponse (if the portal was created from an
// empty query string), ErrorResponse, or PortalSuspended"
MessageFormat::CommandComplete => {
// a SQL command completed normally
let cc: CommandComplete = message.decode()?;
Expand All @@ -290,6 +305,11 @@ impl PgConnection {
// empty query string passed to an unprepared execute
}

// Message::ErrorResponse is handled in self.stream.recv()

// incomplete query execution has finished
MessageFormat::PortalSuspended => {}

MessageFormat::RowDescription => {
// indicates that a *new* set of rows are about to be returned
let (columns, column_names) = self
Expand Down
5 changes: 3 additions & 2 deletions sqlx-core/src/postgres/message/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const CLOSE_STATEMENT: u8 = b'S';
#[allow(dead_code)]
pub enum Close {
Statement(Oid),
Portal(Oid),
// None selects the unnamed portal
Portal(Option<Oid>),
}

impl Encode<'_> for Close {
Expand All @@ -26,7 +27,7 @@ impl Encode<'_> for Close {

Close::Portal(id) => {
buf.push(CLOSE_PORTAL);
buf.put_portal_name(Some(*id));
buf.put_portal_name(*id);
}
})
}
Expand Down

0 comments on commit 04884f1

Please sign in to comment.