Skip to content

Commit

Permalink
fix(dashboard): fix back-pressure dashboard & some refactorings (#15389)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh committed Mar 5, 2024
1 parent f3f7271 commit e8513d4
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 91 deletions.
13 changes: 7 additions & 6 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import { Metrics, MetricsSample } from "../../components/metrics"
import api from "./api"

export const INTERVAL = 5000
export interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

// Get back pressure from meta node -> prometheus
export async function getActorBackPressures() {
// Get back pressure from Prometheus
export async function fetchPrometheusBackPressure() {
const res: BackPressuresMetrics = await api.get(
"/metrics/fragment/prometheus_back_pressures"
)
Expand Down Expand Up @@ -114,7 +113,8 @@ function convertToBackPressureMetrics(

export function calculateBPRate(
backPressureNew: BackPressureInfo[],
backPressureOld: BackPressureInfo[]
backPressureOld: BackPressureInfo[],
intervalMs: number
): BackPressuresMetrics {
let mapNew = convertToMapAndAgg(backPressureNew)
let mapOld = convertToMapAndAgg(backPressureOld)
Expand All @@ -124,7 +124,8 @@ export function calculateBPRate(
result.set(
key,
// The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing
((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) *
((value - (mapOld.get(key) || 0)) /
((intervalMs / 1000) * 1000000000)) *
100
)
} else {
Expand All @@ -149,7 +150,7 @@ export const BackPressureInfo = {
}

// Get back pressure from meta node -> compute node
export async function getBackPressureWithoutPrometheus() {
export async function fetchEmbeddedBackPressure() {
const response = await api.get("/metrics/fragment/embedded_back_pressures")
let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON
Expand Down
173 changes: 88 additions & 85 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
} from "@chakra-ui/react"
import * as d3 from "d3"
import { dagStratify } from "d3-dag"
import _, { sortBy } from "lodash"
import _ from "lodash"
import Head from "next/head"
import { parseAsInteger, useQueryState } from "nuqs"
import { Fragment, useCallback, useEffect, useMemo, useState } from "react"
Expand All @@ -40,11 +40,9 @@ import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
BackPressuresMetrics,
INTERVAL,
calculateBPRate,
getActorBackPressures,
getBackPressureWithoutPrometheus,
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
Expand All @@ -55,6 +53,9 @@ interface DispatcherNode {
[actorId: number]: Dispatcher[]
}

// Refresh interval (ms) for back pressure stats
const INTERVAL = 5000

/** Associated data of each plan node in the fragment graph, including the dispatchers. */
export interface PlanNodeDatum {
name: string
Expand Down Expand Up @@ -181,21 +182,21 @@ const backPressureDataSources: BackPressureDataSource[] = [
"Prometheus",
]

// The state of the embedded back pressure metrics.
// The metrics from previous fetch are stored here to calculate the rate.
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
}

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
// used to get the data source
const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

const { response: actorBackPressures } = useFetch(
getActorBackPressures,
INTERVAL,
backPressureDataSource === "Prometheus"
)
const toast = useErrorToast()

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
Expand All @@ -211,7 +212,6 @@ export default function Streaming() {
}
}
}
return undefined
}, [fragmentList, relationId])

useEffect(() => {
Expand All @@ -222,44 +222,8 @@ export default function Streaming() {
}
}
}
return () => {}
}, [relationId, relationList, setRelationId])

// get back pressure rate without prometheus
// TODO(bugen): extract the following logic to a hook and unify the interface
// with Prometheus data source.
const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] =
useState<BackPressuresMetrics>()
const [previousBP, setPreviousBP] = useState<BackPressureInfo[]>([])
const [currentBP, setCurrentBP] = useState<BackPressureInfo[]>([])
const toast = useErrorToast()

useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
const fetchNewBP = async () => {
const newBP = await getBackPressureWithoutPrometheus()
setPreviousBP(currentBP)
setCurrentBP(newBP)
}

fetchNewBP().catch(console.error)
}, INTERVAL)
return () => clearInterval(interval)
}
}, [currentBP, backPressureDataSource])

useEffect(() => {
if (currentBP !== null && previousBP !== null) {
const metrics = calculateBPRate(currentBP, previousBP)
metrics.outputBufferBlockingDuration = sortBy(
metrics.outputBufferBlockingDuration,
(m) => (m.metric.fragmentId, m.metric.downstreamFragmentId)
)
setBackPressuresMetrics(metrics)
}
}, [currentBP, previousBP])

const fragmentDependency = fragmentDependencyCallback()?.fragmentDep
const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag
const fragments = fragmentDependencyCallback()?.fragments
Expand Down Expand Up @@ -323,49 +287,86 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}

const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

// Periodically fetch Prometheus back-pressure from Meta node
const { response: promethusMetrics } = useFetch(
fetchPrometheusBackPressure,
INTERVAL,
backPressureDataSource === "Prometheus"
)

// Periodically fetch embedded back-pressure from Meta node
// Didn't call `useFetch()` because the `setState` way is special.
const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
useState<EmbeddedBackPressureInfo>()
useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
fetchEmbeddedBackPressure().then(
(newBP) => {
console.log(newBP)
setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
}
)
},
(e) => {
console.error(e)
toast(e, "error")
}
)
}, INTERVAL)
return () => {
clearInterval(interval)
}
}
}, [backPressureDataSource])

const backPressures = useMemo(() => {
if (actorBackPressures || backPressuresMetricsWithoutPromtheus) {
if (promethusMetrics || embeddedBackPressureInfo) {
let map = new Map()

if (
backPressureDataSource === "Embedded" &&
backPressuresMetricsWithoutPromtheus
) {
for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) {
if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.current,
embeddedBackPressureInfo.previous,
INTERVAL
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
`${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`,
m.sample[0].value
)
}
} else if (
backPressureDataSource === "Prometheus" &&
actorBackPressures
) {
if (actorBackPressures) {
for (const m of actorBackPressures.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
} else if (backPressureDataSource === "Prometheus" && promethusMetrics) {
for (const m of promethusMetrics.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
}
}
return map
}
}, [
backPressureDataSource,
actorBackPressures,
backPressuresMetricsWithoutPromtheus,
])
}, [backPressureDataSource, promethusMetrics, embeddedBackPressureInfo])

const retVal = (
<Flex p={3} height="calc(100vh - 20px)" flexDirection="column">
Expand Down Expand Up @@ -444,12 +445,14 @@ export default function Streaming() {
event.target.value as BackPressureDataSource
)
}
defaultValue="Embedded"
>
{backPressureDataSources.map((algo) => (
<option value={algo} key={algo}>
{algo}
</option>
))}
<option value="Embedded" key="Embedded">
Embedded (5 secs)
</option>
<option value="Prometheus" key="Prometheus">
Prometheus (1 min)
</option>
</Select>
</FormControl>
<Flex height="full" width="full" flexDirection="column">
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl From<&RangeVector> for PrometheusVector {
}
}

// Note(eric): For backward compatibility, we store the `InstantVector` as a single sample,
// instead of defining a new struct.
impl From<&InstantVector> for PrometheusVector {
fn from(value: &InstantVector) -> Self {
PrometheusVector {
Expand Down

0 comments on commit e8513d4

Please sign in to comment.