diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index 1b36be23e0d0..d40f92f405df 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -17,13 +17,12 @@ import { Metrics, MetricsSample } from "../../components/metrics" import api from "./api" -export const INTERVAL = 5000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } -// Get back pressure from meta node -> prometheus -export async function getActorBackPressures() { +// Get back pressure from Prometheus +export async function fetchPrometheusBackPressure() { const res: BackPressuresMetrics = await api.get( "/metrics/fragment/prometheus_back_pressures" ) @@ -114,7 +113,8 @@ function convertToBackPressureMetrics( export function calculateBPRate( backPressureNew: BackPressureInfo[], - backPressureOld: BackPressureInfo[] + backPressureOld: BackPressureInfo[], + intervalMs: number ): BackPressuresMetrics { let mapNew = convertToMapAndAgg(backPressureNew) let mapOld = convertToMapAndAgg(backPressureOld) @@ -124,7 +124,8 @@ export function calculateBPRate( result.set( key, // The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing - ((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) * + ((value - (mapOld.get(key) || 0)) / + ((intervalMs / 1000) * 1000000000)) * 100 ) } else { @@ -149,7 +150,7 @@ export const BackPressureInfo = { } // Get back pressure from meta node -> compute node -export async function getBackPressureWithoutPrometheus() { +export async function fetchEmbeddedBackPressure() { const response = await api.get("/metrics/fragment/embedded_back_pressures") let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map( BackPressureInfo.fromJSON diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 926ed767b93e..e72ca7ce3e0c 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -29,7 +29,7 @@ import { } from "@chakra-ui/react" import * as d3 from "d3" import { dagStratify } from "d3-dag" -import _, { sortBy } from "lodash" +import _ from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" import { Fragment, useCallback, useEffect, useMemo, useState } from "react" @@ -40,11 +40,9 @@ import useErrorToast from "../hook/useErrorToast" import useFetch from "../lib/api/fetch" import { BackPressureInfo, - BackPressuresMetrics, - INTERVAL, calculateBPRate, - getActorBackPressures, - getBackPressureWithoutPrometheus, + fetchEmbeddedBackPressure, + fetchPrometheusBackPressure, } from "../lib/api/metric" import { getFragments, getStreamingJobs } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" @@ -55,6 +53,9 @@ interface DispatcherNode { [actorId: number]: Dispatcher[] } +// Refresh interval (ms) for back pressure stats +const INTERVAL = 5000 + /** Associated data of each plan node in the fragment graph, including the dispatchers. */ export interface PlanNodeDatum { name: string @@ -181,21 +182,21 @@ const backPressureDataSources: BackPressureDataSource[] = [ "Prometheus", ] +// The state of the embedded back pressure metrics. +// The metrics from previous fetch are stored here to calculate the rate. +interface EmbeddedBackPressureInfo { + previous: BackPressureInfo[] + current: BackPressureInfo[] +} + export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) const { response: fragmentList } = useFetch(getFragments) const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [selectedFragmentId, setSelectedFragmentId] = useState() - // used to get the data source - const [backPressureDataSource, setBackPressureDataSource] = - useState("Embedded") - const { response: actorBackPressures } = useFetch( - getActorBackPressures, - INTERVAL, - backPressureDataSource === "Prometheus" - ) + const toast = useErrorToast() const fragmentDependencyCallback = useCallback(() => { if (fragmentList) { @@ -211,7 +212,6 @@ export default function Streaming() { } } } - return undefined }, [fragmentList, relationId]) useEffect(() => { @@ -222,44 +222,8 @@ export default function Streaming() { } } } - return () => {} }, [relationId, relationList, setRelationId]) - // get back pressure rate without prometheus - // TODO(bugen): extract the following logic to a hook and unify the interface - // with Prometheus data source. - const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = - useState() - const [previousBP, setPreviousBP] = useState([]) - const [currentBP, setCurrentBP] = useState([]) - const toast = useErrorToast() - - useEffect(() => { - if (backPressureDataSource === "Embedded") { - const interval = setInterval(() => { - const fetchNewBP = async () => { - const newBP = await getBackPressureWithoutPrometheus() - setPreviousBP(currentBP) - setCurrentBP(newBP) - } - - fetchNewBP().catch(console.error) - }, INTERVAL) - return () => clearInterval(interval) - } - }, [currentBP, backPressureDataSource]) - - useEffect(() => { - if (currentBP !== null && previousBP !== null) { - const metrics = calculateBPRate(currentBP, previousBP) - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragmentId, m.metric.downstreamFragmentId) - ) - setBackPressuresMetrics(metrics) - } - }, [currentBP, previousBP]) - const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag const fragments = fragmentDependencyCallback()?.fragments @@ -323,49 +287,86 @@ export default function Streaming() { toast(new Error(`Actor ${searchActorIdInt} not found`)) } + const [backPressureDataSource, setBackPressureDataSource] = + useState("Embedded") + + // Periodically fetch Prometheus back-pressure from Meta node + const { response: promethusMetrics } = useFetch( + fetchPrometheusBackPressure, + INTERVAL, + backPressureDataSource === "Prometheus" + ) + + // Periodically fetch embedded back-pressure from Meta node + // Didn't call `useFetch()` because the `setState` way is special. + const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = + useState() + useEffect(() => { + if (backPressureDataSource === "Embedded") { + const interval = setInterval(() => { + fetchEmbeddedBackPressure().then( + (newBP) => { + console.log(newBP) + setEmbeddedBackPressureInfo((prev) => + prev + ? { + previous: prev.current, + current: newBP, + } + : { + previous: newBP, // Use current value to show zero rate, but it's fine + current: newBP, + } + ) + }, + (e) => { + console.error(e) + toast(e, "error") + } + ) + }, INTERVAL) + return () => { + clearInterval(interval) + } + } + }, [backPressureDataSource]) + const backPressures = useMemo(() => { - if (actorBackPressures || backPressuresMetricsWithoutPromtheus) { + if (promethusMetrics || embeddedBackPressureInfo) { let map = new Map() - if ( - backPressureDataSource === "Embedded" && - backPressuresMetricsWithoutPromtheus - ) { - for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { + if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) { + const metrics = calculateBPRate( + embeddedBackPressureInfo.current, + embeddedBackPressureInfo.previous, + INTERVAL + ) + for (const m of metrics.outputBufferBlockingDuration) { map.set( `${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`, m.sample[0].value ) } - } else if ( - backPressureDataSource === "Prometheus" && - actorBackPressures - ) { - if (actorBackPressures) { - for (const m of actorBackPressures.outputBufferBlockingDuration) { - if (m.sample.length > 0) { - // Note: We issue an instant query to Prometheus to get the most recent value. - // So there should be only one sample here. - // - // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still - // possible that an old version of meta service returns a range-query result. - // So we take the one with the latest timestamp here. - const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - value - ) - } + } else if (backPressureDataSource === "Prometheus" && promethusMetrics) { + for (const m of promethusMetrics.outputBufferBlockingDuration) { + if (m.sample.length > 0) { + // Note: We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + // + // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still + // possible that an old version of meta service returns a range-query result. + // So we take the one with the latest timestamp here. + const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) } } } return map } - }, [ - backPressureDataSource, - actorBackPressures, - backPressuresMetricsWithoutPromtheus, - ]) + }, [backPressureDataSource, promethusMetrics, embeddedBackPressureInfo]) const retVal = ( @@ -444,12 +445,14 @@ export default function Streaming() { event.target.value as BackPressureDataSource ) } + defaultValue="Embedded" > - {backPressureDataSources.map((algo) => ( - - ))} + + diff --git a/src/meta/src/dashboard/prometheus.rs b/src/meta/src/dashboard/prometheus.rs index 435dd6896e6e..8b6ebe445de9 100644 --- a/src/meta/src/dashboard/prometheus.rs +++ b/src/meta/src/dashboard/prometheus.rs @@ -54,6 +54,8 @@ impl From<&RangeVector> for PrometheusVector { } } +// Note(eric): For backward compatibility, we store the `InstantVector` as a single sample, +// instead of defining a new struct. impl From<&InstantVector> for PrometheusVector { fn from(value: &InstantVector) -> Self { PrometheusVector {