diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index 80a7e9e121..ad8e794c30 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -134,7 +134,6 @@ impl PgConnection { message if message.format == MessageFormat::PortalSuspended => { // there was an open portal // this can happen if the last time a statement was used it was not fully executed - // such as in [fetch_one] } message if message.format == MessageFormat::CloseComplete => { @@ -219,8 +218,7 @@ impl PgConnection { // patch holes created during encoding arguments.apply_patches(self, &metadata.parameters).await?; - // apply patches use fetch_optional thaht may produce `PortalSuspended` message, - // consume messages til `ReadyForQuery` before bind and execute + // consume messages till `ReadyForQuery` before bind and execute self.wait_until_ready().await?; // bind to attach the arguments to the statement and create a portal @@ -239,6 +237,16 @@ impl PgConnection { portal: None, limit: limit.into(), }); + // From https://www.postgresql.org/docs/current/protocol-flow.html: + // + // "An unnamed portal is destroyed at the end of the transaction, or as + // soon as the next Bind statement specifying the unnamed portal as + // destination is issued. (Note that a simple Query message also + // destroys the unnamed portal." + + // we ask the database server to close the unnamed portal and free the associated resources + // earlier - after the execution of the current query. + self.stream.write(message::Close::Portal(None)); // finally, [Sync] asks postgres to process the messages that we sent and respond with // a [ReadyForQuery] message when it's completely done. Theoretically, we could send @@ -271,10 +279,17 @@ impl PgConnection { MessageFormat::BindComplete | MessageFormat::ParseComplete | MessageFormat::ParameterDescription - | MessageFormat::NoData => { + | MessageFormat::NoData + // unnamed portal has been closed + | MessageFormat::CloseComplete + => { // harmless messages to ignore } + // "Execute phase is always terminated by the appearance of + // exactly one of these messages: CommandComplete, + // EmptyQueryResponse (if the portal was created from an + // empty query string), ErrorResponse, or PortalSuspended" MessageFormat::CommandComplete => { // a SQL command completed normally let cc: CommandComplete = message.decode()?; @@ -290,6 +305,11 @@ impl PgConnection { // empty query string passed to an unprepared execute } + // Message::ErrorResponse is handled in self.stream.recv() + + // incomplete query execution has finished + MessageFormat::PortalSuspended => {} + MessageFormat::RowDescription => { // indicates that a *new* set of rows are about to be returned let (columns, column_names) = self diff --git a/sqlx-core/src/postgres/message/close.rs b/sqlx-core/src/postgres/message/close.rs index a073979063..ed5de761c8 100644 --- a/sqlx-core/src/postgres/message/close.rs +++ b/sqlx-core/src/postgres/message/close.rs @@ -9,7 +9,8 @@ const CLOSE_STATEMENT: u8 = b'S'; #[allow(dead_code)] pub enum Close { Statement(Oid), - Portal(Oid), + // None selects the unnamed portal + Portal(Option), } impl Encode<'_> for Close { @@ -26,7 +27,7 @@ impl Encode<'_> for Close { Close::Portal(id) => { buf.push(CLOSE_PORTAL); - buf.put_portal_name(Some(*id)); + buf.put_portal_name(*id); } }) }