Skip to content

Commit

Permalink
Sync delegate api changes + spire-api-sdk bump to 1.10.2 (#96)
Browse files Browse the repository at this point in the history
Sync delegate api changes + `spire-api-sdk` bump to 1.10.2 (#96)

Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
  • Loading branch information
bleggett committed Sep 12, 2024
1 parent ceef673 commit c6a44ea
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 28 deletions.
2 changes: 1 addition & 1 deletion spire-api/spire-api-sdk
52 changes: 42 additions & 10 deletions spire-api/src/agent/delegated_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ pub struct DelegatedIdentityClient {
client: DelegatedIdentityApiClient<tonic::transport::Channel>,
}

/// Represents that a delegate attestation request can have one-of
/// PID (let agent attest PID->selectors) or selectors (delegate has already attested a PID)
#[derive(Debug, Clone)]
pub enum DelegateAttestationRequest {
/// PID (let agent attest PID->selectors)
Pid(i32),
/// selectors (delegate has already attested a PID and generated full set of selectors)
Selectors(Vec<Selector>),
}

/// Constructors
impl DelegatedIdentityClient {
const UNIX_PREFIX: &'static str = "unix:";
Expand Down Expand Up @@ -135,10 +145,17 @@ impl DelegatedIdentityClient {
/// Returns [`GrpcClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
pub async fn fetch_x509_svid(
&mut self,
selectors: Vec<Selector>,
attest_type: DelegateAttestationRequest,
) -> Result<X509Svid, GrpcClientError> {
let request = SubscribeToX509sviDsRequest {
selectors: selectors.into_iter().map(|s| s.into()).collect(),
let request = match attest_type {
DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
selectors: selectors.into_iter().map(|s| s.into()).collect(),
pid: 0,
},
DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
selectors: Vec::default(),
pid,
},
};

self.client
Expand Down Expand Up @@ -175,10 +192,17 @@ impl DelegatedIdentityClient {
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
pub async fn stream_x509_svids(
&mut self,
selectors: Vec<Selector>,
attest_type: DelegateAttestationRequest,
) -> Result<impl Stream<Item = Result<X509Svid, GrpcClientError>>, GrpcClientError> {
let request = SubscribeToX509sviDsRequest {
selectors: selectors.into_iter().map(|s| s.into()).collect(),
let request = match attest_type {
DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
selectors: selectors.into_iter().map(|s| s.into()).collect(),
pid: 0,
},
DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
selectors: Vec::default(),
pid,
},
};

let response: tonic::Response<tonic::Streaming<SubscribeToX509sviDsResponse>> =
Expand Down Expand Up @@ -259,11 +283,19 @@ impl DelegatedIdentityClient {
pub async fn fetch_jwt_svids<T: AsRef<str> + ToString>(
&mut self,
audience: &[T],
selectors: Vec<Selector>,
attest_type: DelegateAttestationRequest,
) -> Result<Vec<JwtSvid>, GrpcClientError> {
let request = FetchJwtsviDsRequest {
audience: audience.iter().map(|s| s.to_string()).collect(),
selectors: selectors.into_iter().map(|s| s.into()).collect(),
let request = match attest_type {
DelegateAttestationRequest::Selectors(selectors) => FetchJwtsviDsRequest {
audience: audience.iter().map(|s| s.to_string()).collect(),
selectors: selectors.into_iter().map(|s| s.into()).collect(),
pid: 0,
},
DelegateAttestationRequest::Pid(pid) => FetchJwtsviDsRequest {
audience: audience.iter().map(|s| s.to_string()).collect(),
selectors: Vec::default(),
pid,
},
};

DelegatedIdentityClient::parse_jwt_svid_from_grpc_response(
Expand Down
2 changes: 1 addition & 1 deletion spire-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ pub mod agent;
pub mod selectors;

// Core spire-api crate type re-exported for simplified access.
pub use agent::delegated_identity::DelegatedIdentityClient;
pub use agent::delegated_identity::{DelegateAttestationRequest, DelegatedIdentityClient};
43 changes: 40 additions & 3 deletions spire-api/src/proto/spire.api.agent.delegatedidentity.v1.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// X.509 SPIFFE Verifiable Identity Document with the private key.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -11,13 +12,27 @@ pub struct X509svidWithKey {
}
/// SubscribeToX509SVIDsRequest is used by clients to subscribe the set of SVIDs that
/// any given workload is entitled to. Clients subscribe to a workload's SVIDs by providing
/// a set of selectors describing the workload.
/// one-of
/// - a set of selectors describing the workload.
/// - a PID of a workload process.
/// Specifying both at the same time is not allowed.
///
/// Subscribers are expected to ensure that the PID they use is not recycled
/// for the lifetime of the stream, and in the event that it is, are expected
/// to immediately close the stream.
///
/// TODO we should use `oneof` here but you currently cannot use `repeated`
/// in a `oneof` without creating and nesting an intermediate `message` type, which would break
/// back compat - so we accept both and check for mutual exclusion in the handler
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeToX509sviDsRequest {
/// Required. Selectors describing the workload to subscribe to.
/// Selectors describing the workload to subscribe to. Mutually exclusive with `pid`.
#[prost(message, repeated, tag = "1")]
pub selectors: ::prost::alloc::vec::Vec<super::super::super::types::Selector>,
/// PID for the workload to subscribe to. Mutually exclusive with `selectors`
#[prost(int32, tag = "2")]
pub pid: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -45,15 +60,31 @@ pub struct SubscribeToX509BundlesResponse {
::prost::bytes::Bytes,
>,
}
/// FetchJWTSVIDsRequest is used by clients to fetch a JWT-SVID for a workload.
/// Clients may provide one-of
/// - a set of selectors describing the workload.
/// - a PID of a workload process.
/// Specifying both at the same time is not allowed.
///
/// Callers are expected to ensure that the PID they use is not recycled
/// until obtaining a response, and in the event that it is, are expected
/// to discard the response of this call.
///
/// TODO we should use `oneof` here but you currently cannot use `repeated`
/// in a `oneof` without creating and nesting an intermediate `message` type, which would break
/// back compat - so we accept both and check for mutual exclusion in the handler
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchJwtsviDsRequest {
/// Required. The audience(s) the workload intends to authenticate against.
#[prost(string, repeated, tag = "1")]
pub audience: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// Required. Selectors describing the workload to fetch.
/// Selectors describing the workload to subscribe to. Mutually exclusive with `pid`
#[prost(message, repeated, tag = "2")]
pub selectors: ::prost::alloc::vec::Vec<super::super::super::types::Selector>,
/// PID for the workload to subscribe to. Mutually exclusive with `selectors`.
#[prost(int32, tag = "3")]
pub pid: i32,
}
/// The FetchJWTSVIDsResponse message conveys JWT-SVIDs.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -197,6 +228,9 @@ pub mod delegated_identity_client {
);
self.inner.server_streaming(req, path, codec).await
}
/// Subscribe to get X.509-SVIDs for workloads that match the given selectors.
/// The lifetime of the subscription aligns to the lifetime of the stream.
///
/// Subscribe to get local and all federated bundles.
/// The lifetime of the subscription aligns to the lifetime of the stream.
pub async fn subscribe_to_x509_bundles(
Expand Down Expand Up @@ -333,6 +367,9 @@ pub mod delegated_identity_server {
>
+ Send
+ 'static;
/// Subscribe to get X.509-SVIDs for workloads that match the given selectors.
/// The lifetime of the subscription aligns to the lifetime of the stream.
///
/// Subscribe to get local and all federated bundles.
/// The lifetime of the subscription aligns to the lifetime of the stream.
async fn subscribe_to_x509_bundles(
Expand Down
1 change: 1 addition & 0 deletions spire-api/src/proto/spire.api.types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Selector {
Expand Down
26 changes: 13 additions & 13 deletions spire-api/tests/delegated_identity_api_client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod integration_tests_delegate_identity_api_client {
use once_cell::sync::Lazy;
use spiffe::bundle::BundleRefSource;
use spiffe::{JwtBundleSet, TrustDomain};
use spire_api::{selectors, DelegatedIdentityClient};
use spire_api::{selectors, DelegateAttestationRequest, DelegatedIdentityClient};
use std::process::Command;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -38,9 +38,9 @@ mod integration_tests_delegate_identity_api_client {
let svid = client
.fetch_jwt_svids(
&["my_audience"],
vec![selectors::Selector::Unix(selectors::Unix::Uid(
get_uid() + 1,
))],
DelegateAttestationRequest::Selectors(vec![selectors::Selector::Unix(
selectors::Unix::Uid(get_uid() + 1),
)]),
)
.await
.expect("Failed to fetch JWT SVID");
Expand All @@ -52,9 +52,9 @@ mod integration_tests_delegate_identity_api_client {
async fn fetch_delegate_x509_svid() {
let mut client = get_client().await;
let response: spiffe::svid::x509::X509Svid = client
.fetch_x509_svid(vec![selectors::Selector::Unix(selectors::Unix::Uid(
get_uid() + 1,
))])
.fetch_x509_svid(DelegateAttestationRequest::Selectors(vec![
selectors::Selector::Unix(selectors::Unix::Uid(get_uid() + 1)),
]))
.await
.expect("Failed to fetch delegate SVID");
// Not checking the chain as the root is generated by spire.
Expand All @@ -71,9 +71,9 @@ mod integration_tests_delegate_identity_api_client {
let test_duration = std::time::Duration::from_secs(60);
let mut client = get_client().await;
let mut stream = client
.stream_x509_svids(vec![selectors::Selector::Unix(selectors::Unix::Uid(
get_uid() + 1,
))])
.stream_x509_svids(DelegateAttestationRequest::Selectors(vec![
selectors::Selector::Unix(selectors::Unix::Uid(get_uid() + 1)),
]))
.await
.expect("Failed to fetch delegate SVID");

Expand Down Expand Up @@ -124,9 +124,9 @@ mod integration_tests_delegate_identity_api_client {
let svids = client
.fetch_jwt_svids(
&["my_audience"],
vec![selectors::Selector::Unix(selectors::Unix::Uid(
get_uid() + 1,
))],
DelegateAttestationRequest::Selectors(vec![selectors::Selector::Unix(
selectors::Unix::Uid(get_uid() + 1),
)]),
)
.await
.expect("Failed to fetch JWT SVID");
Expand Down

0 comments on commit c6a44ea

Please sign in to comment.