Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: clippy disallow futures::pin_mut macro #7016

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ disallowed-methods = [
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
]

disallowed-macros = [
# use std::pin::pin
"futures::pin_mut",
# cannot disallow this, because clippy finds used from tokio macros
#"tokio::pin",
]
2 changes: 1 addition & 1 deletion control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl PageServerNode {
eprintln!("connection error: {}", e);
}
});
tokio::pin!(client);
let client = std::pin::pin!(client);

// Init base reader
let (start_lsn, base_tarfile_path) = base;
Expand Down
4 changes: 1 addition & 3 deletions libs/postgres_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use futures::pin_mut;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::net::SocketAddr;
Expand Down Expand Up @@ -378,8 +377,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let flush_fut = self.flush();
pin_mut!(flush_fut);
let flush_fut = std::pin::pin!(self.flush());
flush_fut.poll(cx)
}

Expand Down
4 changes: 1 addition & 3 deletions proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use anyhow::bail;
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::header;
Expand Down Expand Up @@ -531,13 +530,12 @@ async fn query_to_json<T: GenericClient>(
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
info!("executing query");
let query_params = data.params;
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?);
info!("finished executing query");

// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
while let Some(row) = row_stream.next().await {
let row = row?;
Expand Down
5 changes: 2 additions & 3 deletions s3_scrubber/src/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use utils::id::TimelineId;
use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use futures_util::{pin_mut, StreamExt};
use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
Expand Down Expand Up @@ -285,8 +285,7 @@ pub(crate) async fn list_timeline_blobs(
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
let mut initdb_archive: bool = false;

let stream = stream_listing(s3_client, &timeline_dir_target);
pin_mut!(stream);
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = obj.key();
Expand Down
14 changes: 7 additions & 7 deletions s3_scrubber/src/garbage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aws_sdk_s3::{
types::{Delete, ObjectIdentifier},
Client,
};
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -199,12 +199,12 @@ async fn find_garbage_inner(
}
}
});
let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
let mut tenants_checked =
std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));

// Process the results of Tenant checks. If a Tenant is garbage, it goes into
// the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
// checks if they are enabled by the `depth` parameter.
pin_mut!(tenants_checked);
let mut garbage = GarbageList::new(node_kind, bucket_config);
let mut active_tenants: Vec<TenantShardId> = vec![];
let mut counter = 0;
Expand Down Expand Up @@ -267,10 +267,10 @@ async fn find_garbage_inner(
.map(|r| (ttid, r))
}
});
let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
let mut timelines_checked =
std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));

// Update the GarbageList with any timelines which appear not to exist.
pin_mut!(timelines_checked);
while let Some(result) = timelines_checked.next().await {
let (ttid, console_result) = result?;
if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
Expand Down Expand Up @@ -425,9 +425,9 @@ pub async fn purge_garbage(
}
}
});
let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY);
let mut get_objects_results =
std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));

pin_mut!(get_objects_results);
let mut objects_to_delete = Vec::new();
while let Some(result) = get_objects_results.next().await {
let mut object_list = result?;
Expand Down
5 changes: 2 additions & 3 deletions s3_scrubber/src/scan_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::checks::{
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::IndexPart;
Expand Down Expand Up @@ -226,7 +226,7 @@ pub async fn scan_metadata(
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffered(CONCURRENCY);
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));

// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
Expand Down Expand Up @@ -309,7 +309,6 @@ pub async fn scan_metadata(
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new();
pin_mut!(timelines);
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/src/wal_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn handle_socket(
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// shouldn't be moved.
tokio::pin!(socket);
let socket = std::pin::pin!(socket);

let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
Expand Down