Skip to content

Commit

Permalink
feat(o11y): Inter-process tracing (#8004)
Browse files Browse the repository at this point in the history
Serialize TraceId and SpanId to a new field of `PeerMessage`. This lets the receiving node link traces to a trace that generated the network request.

`TextMapPropagator` is the interface designed to solve a similar problem, but given that:
1) we have to invoke it manually
2) our custom propagator is stateless

... following the `TextMapPropagator` interface doesn't add value. Setting a global text map propagator doesn't add value for the same reason.

Because we want the inter-process tracing to be enabled at the debug level:
* We need handling of `SendMessage` to enable tracing at the debug level
* And a corresponding `PeerManagerActor` tracing needs to be enabled at the debug level.

https://pagodaplatform.atlassian.net/browse/ND-172
  • Loading branch information
nikurt committed Nov 11, 2022
1 parent 5e326f5 commit 844297e
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
information: [#7711](https://github.com/near/nearcore/pull/7711).
* Change exporter of tracing information from `opentelemetry-jaeger` to
`opentelemetry-otlp`: [#7563](https://github.com/near/nearcore/pull/7563).
* Tracing of requests across processes:
[#8004](https://github.com/near/nearcore/pull/8004).

## 1.29.0 [2022-08-15]

Expand Down
28 changes: 23 additions & 5 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ mod _proto {

pub use _proto::network as proto;

use crate::network_protocol::proto_conv::trace_context::{
extract_span_context, inject_trace_context,
};
use crate::time;
use borsh::{BorshDeserialize as _, BorshSerialize as _};
use near_crypto::PublicKey;
use near_crypto::Signature;
use near_o11y::OpenTelemetrySpanExt;
use near_primitives::block::{Approval, Block, BlockHeader, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::hash::CryptoHash;
Expand All @@ -40,7 +44,9 @@ use near_primitives::views::FinalExecutionOutcomeView;
use protobuf::Message as _;
use std::collections::HashSet;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use tracing::Span;

#[derive(PartialEq, Eq, Clone, Debug, Hash)]
pub struct PeerAddr {
Expand Down Expand Up @@ -281,26 +287,38 @@ pub enum ParsePeerMessageError {
}

impl PeerMessage {
/// Serializes a message in the given encoding.
/// If the encoding is `Proto`, then also attaches current Span's context to the message.
pub(crate) fn serialize(&self, enc: Encoding) -> Vec<u8> {
match enc {
Encoding::Borsh => borsh_::PeerMessage::from(self).try_to_vec().unwrap(),
Encoding::Proto => proto::PeerMessage::from(self).write_to_bytes().unwrap(),
Encoding::Proto => {
let mut msg = proto::PeerMessage::from(self);
let cx = Span::current().context();
msg.trace_context = inject_trace_context(&cx);
msg.write_to_bytes().unwrap()
}
}
}

pub(crate) fn deserialize(
enc: Encoding,
data: &[u8],
) -> Result<PeerMessage, ParsePeerMessageError> {
let span = tracing::trace_span!(target: "network", "deserialize").entered();
Ok(match enc {
Encoding::Borsh => (&borsh_::PeerMessage::try_from_slice(data)
.map_err(ParsePeerMessageError::BorshDecode)?)
.try_into()
.map_err(ParsePeerMessageError::BorshConv)?,
Encoding::Proto => (&proto::PeerMessage::parse_from_bytes(data)
.map_err(ParsePeerMessageError::ProtoDecode)?)
.try_into()
.map_err(ParsePeerMessageError::ProtoConv)?,
Encoding::Proto => {
let proto_msg: proto::PeerMessage = proto::PeerMessage::parse_from_bytes(data)
.map_err(ParsePeerMessageError::ProtoDecode)?;
if let Ok(extracted_span_context) = extract_span_context(&proto_msg.trace_context) {
span.clone().or_current().add_link(extracted_span_context);
}
(&proto_msg).try_into().map_err(|err| ParsePeerMessageError::ProtoConv(err))?
}
})
}

Expand Down
19 changes: 19 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,22 @@ message RoutingSyncV2 {
bytes borsh = 1;
}

// Inter-process tracing information.
message TraceContext {
enum SamplingPriority {
UNKNOWN = 0;
AutoReject = 1;
UserReject = 2;
AutoKeep = 3;
UserKeep = 4;
}
// 16 bytes representing TraceId: https://docs.rs/opentelemetry/latest/opentelemetry/trace/struct.TraceId.html
bytes trace_id = 1;
// 8 bytes representing SpanId: https://docs.rs/opentelemetry/latest/opentelemetry/trace/struct.SpanId.html
bytes span_id = 2;
SamplingPriority sampling_priority = 3;
}

// PeerMessage is a wrapper of all message types exchanged between NEAR nodes.
// The wire format of a single message M consists of len(M)+4 bytes:
// <len(M)> : 4 bytes : little endian uint32
Expand All @@ -329,6 +345,9 @@ message PeerMessage {
// Deprecated fields.
reserved 9,20,21,22,23,24;

// Inter-process tracing information.
TraceContext trace_context = 26;

oneof message_type {
Handshake handshake = 4;
HandshakeFailure handshake_failure = 5;
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/network_protocol/proto_conv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod handshake;
mod net;
mod peer_message;
mod time;
pub mod trace_context;
/// Contains protobuf <-> network_protocol conversions.
mod util;

Expand Down
77 changes: 77 additions & 0 deletions chain/network/src/network_protocol/proto_conv/trace_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::network_protocol::proto::trace_context::SamplingPriority;
use crate::network_protocol::proto::TraceContext;
use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState};
use opentelemetry::Context;
use protobuf::{EnumOrUnknown, MessageField};

/// Lowest available value.
/// 0x01 is reserved for `SAMPLED`: https://docs.rs/opentelemetry/latest/opentelemetry/trace/struct.TraceFlags.html#associatedconstant.SAMPLED
const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02);

#[derive(Debug, thiserror::Error)]
pub(crate) enum ExtractError {
#[error("Malformed or invalid TraceId")]
TraceId,
#[error("Malformed or invalid SpanId")]
SpanId,
#[error("Missing trace_id or span_id")]
Empty,
}

/// Extracts a `SpanContext` from a potentially empty `TraceContext`.
pub(crate) fn extract_span_context(
trace_context: &MessageField<TraceContext>,
) -> Result<SpanContext, ExtractError> {
if trace_context.is_some() {
let trace_id = extract_trace_id(&trace_context.trace_id)?;
// If we have a trace_id but can't get the parent span, we default it to invalid instead of completely erroring
// out so that the rest of the spans aren't completely lost.
let span_id = extract_span_id(&trace_context.span_id).unwrap_or(SpanId::INVALID);
let sampled = match trace_context.sampling_priority.enum_value() {
Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => {
TraceFlags::default()
}
Ok(SamplingPriority::UserKeep) | Ok(SamplingPriority::AutoKeep) => TraceFlags::SAMPLED,
// Treat the sampling as DEFERRED instead of erring on extracting the span context
Ok(SamplingPriority::UNKNOWN) | Err(_) => TRACE_FLAG_DEFERRED,
};
let trace_state = TraceState::default();
Ok(SpanContext::new(trace_id, span_id, sampled, true, trace_state))
} else {
Err(ExtractError::Empty)
}
}

/// Populates `TraceContext` representing the current span.
/// Returns `None` if no current span is available.
pub(crate) fn inject_trace_context(cx: &Context) -> MessageField<TraceContext> {
let span = cx.span();
let span_context = span.span_context();
if span_context.is_valid() {
let mut trace_context = TraceContext::new();

// Uses `u128::to_be_bytes()` internally.
trace_context.trace_id = span_context.trace_id().to_bytes().to_vec();
// Uses `u64::to_be_bytes()` internally.
trace_context.span_id = span_context.span_id().to_bytes().to_vec();

if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED {
let sampling_priority = if span_context.is_sampled() {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
};
trace_context.sampling_priority = EnumOrUnknown::new(sampling_priority);
}
MessageField::some(trace_context)
} else {
MessageField::none()
}
}
fn extract_trace_id(trace_id: &[u8]) -> Result<TraceId, ExtractError> {
Ok(TraceId::from_bytes(trace_id.try_into().map_err(|_| ExtractError::TraceId)?))
}

fn extract_span_id(span_id: &[u8]) -> Result<SpanId, ExtractError> {
Ok(SpanId::from_bytes(span_id.try_into().map_err(|_| ExtractError::SpanId)?))
}
3 changes: 1 addition & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ impl PeerActor {
}

fn parse_message(&mut self, msg: &[u8]) -> Result<PeerMessage, ParsePeerMessageError> {
let _span = tracing::trace_span!(target: "network", "parse_message").entered();
if let Some(e) = self.encoding() {
return PeerMessage::deserialize(e, msg);
}
Expand Down Expand Up @@ -1187,7 +1186,7 @@ impl actix::Handler<stream::Frame> for PeerActor {
type Result = ();
#[perf]
fn handle(&mut self, stream::Frame(msg): stream::Frame, ctx: &mut Self::Context) {
let _span = tracing::trace_span!(
let _span = tracing::debug_span!(
target: "network",
"handle",
handler = "bytes",
Expand Down
7 changes: 5 additions & 2 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use crate::types::{
use actix::fut::future::wrap_future;
use actix::{Actor, AsyncContext, Context, Handler, Running};
use anyhow::Context as _;
use near_o11y::{handler_trace_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_o11y::{
handler_debug_span, handler_trace_span, OpenTelemetrySpanExt, WithSpanContext,
WithSpanContextExt,
};
use near_performance_metrics_macros::perf;
use near_primitives::block::GenesisId;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -1018,7 +1021,7 @@ impl Handler<WithSpanContext<PeerManagerMessageRequest>> for PeerManagerActor {
ctx: &mut Self::Context,
) -> Self::Result {
let msg_type: &str = (&msg.msg).into();
let (_span, msg) = handler_trace_span!(target: "network", msg, msg_type);
let (_span, msg) = handler_debug_span!(target: "network", msg, msg_type);
let _timer =
metrics::PEER_MANAGER_MESSAGES_TIME.with_label_values(&[(&msg).into()]).start_timer();
self.handle_peer_manager_message(msg, ctx)
Expand Down
11 changes: 8 additions & 3 deletions core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,18 @@ where
let (filter, handle) = reload::Layer::<LevelFilter, S>::new(filter);

let mut resource = vec![
KeyValue::new(SERVICE_NAME, "neard"),
KeyValue::new("chain_id", chain_id),
KeyValue::new("node_id", node_public_key.to_string()),
];
if let Some(account_id) = account_id {
// Prefer account name as the node name.
// Fallback to a node public key if a validator key is unavailable.
let service_name = if let Some(account_id) = account_id {
resource.push(KeyValue::new("account_id", account_id.to_string()));
}
format!("neard:{}", account_id)
} else {
format!("neard:{}", node_public_key)
};
resource.push(KeyValue::new(SERVICE_NAME, service_name));

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
Expand Down

0 comments on commit 844297e

Please sign in to comment.