From 1ea9158a50ecb3c6e928c8bbc85bdade9ab80ed6 Mon Sep 17 00:00:00 2001 From: Hardik Bajaj <58038410+hardikbajaj@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:57:58 +0530 Subject: [PATCH] Added new SysMonitorOshi v0 using Oshi library (#14359) Added a new monitor SysMonitorOshi to replace SysMonitor. The new monitor has a wider support for different machine architectures including ARM instances. Please switch to SysMonitorOshi as SysMonitor is now deprecated and will be removed in future releases. --- LICENSE | 3 + distribution/bin/check-licenses.py | 2 +- licenses.yaml | 15 +- licenses/bin/oshi.MIT | 21 + pom.xml | 9 +- processing/pom.xml | 6 + .../java/util/metrics/NoopOshiSysMonitor.java | 36 ++ .../java/util/metrics/OshiSysMonitor.java | 466 +++++++++++++ .../druid/java/util/metrics/SysMonitor.java | 8 + .../util/metrics/NoopOshiSysMonitorTest.java | 38 ++ .../java/util/metrics/OshiSysMonitorTest.java | 611 ++++++++++++++++++ .../druid/server/metrics/MetricsModule.java | 17 + .../server/metrics/MetricsModuleTest.java | 26 + 13 files changed, 1254 insertions(+), 4 deletions(-) create mode 100644 licenses/bin/oshi.MIT create mode 100644 processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java create mode 100644 processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java create mode 100644 processing/src/test/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitorTest.java create mode 100644 processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java diff --git a/LICENSE b/LICENSE index 68531acb2de2..4b63a77502ed 100644 --- a/LICENSE +++ b/LICENSE @@ -279,6 +279,9 @@ SOURCE/JAVA-CORE This product contains lpad and rpad methods adapted from Apache Flink. * processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java + This product contains SystemInfo methods adapted from oshi + * processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java + MIT License ================================ diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index ff77eeace0bf..5b059fd23afb 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -292,6 +292,7 @@ def build_compatible_license_names(): compatible_licenses['MIT License'] = 'MIT License' compatible_licenses['The MIT License (MIT)'] = 'MIT License' compatible_licenses['Bouncy Castle Licence'] = 'MIT License' + compatible_licenses['SPDX-License-Identifier: MIT'] = 'MIT License' compatible_licenses['The Go license'] = 'The Go license' @@ -435,7 +436,6 @@ def check_licenses(license_yaml, dependency_reports_root): license_yaml = args.license_yaml dependency_reports_root = args.dependency_reports_root - check_licenses(license_yaml, dependency_reports_root) except KeyboardInterrupt: diff --git a/licenses.yaml b/licenses.yaml index a16bdc80e9f6..843fc6857473 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1591,7 +1591,7 @@ name: Java Native Access (JNA) license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.5.1 +version: 5.13.0 libraries: - net.java.dev.jna: jna @@ -2341,6 +2341,17 @@ notices: --- +name: OSHI +license_category: binary +module: java-core +license_name: MIT License +version: 6.4.2 +libraries: + - com.github.oshi: oshi-core +license_file_path: licenses/bin/oshi.MIT + +--- + name: JBoss Logging 3 license_category: binary module: java-core @@ -4914,7 +4925,7 @@ libraries: name: net.java.dev.jna jna-platform license_category: binary -version: 5.2.0 +version: 5.13.0 module: druid-ranger-security license_name: Apache License version 2.0 libraries: diff --git a/licenses/bin/oshi.MIT b/licenses/bin/oshi.MIT new file mode 100644 index 000000000000..5fa8e283344d --- /dev/null +++ b/licenses/bin/oshi.MIT @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2010-2023 The OSHI Project Contributors: https://github.com/oshi/oshi/graphs/contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7e50bac3a588..c495ea3255aa 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,8 @@ 3.21.7 1.3.1 1.7.36 + 5.13.0 + 5.13.0 3.3.5 4.3.1 1.12.317 @@ -882,7 +884,12 @@ net.java.dev.jna jna - 4.5.1 + ${jna.version} + + + net.java.dev.jna + jna-platform + ${jna-platform.version} org.apache.commons diff --git a/processing/pom.xml b/processing/pom.xml index 290523b27051..c41622d9c1ec 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -37,6 +37,7 @@ 1.6.5 ${sigar.base.version}.132 5.3.4 + 6.4.2 @@ -335,6 +336,11 @@ dependency are copied as resources. See maven-dependency-plugin configuration and below. --> provided + + com.github.oshi + oshi-core + ${oshi.version} + diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java new file mode 100644 index 000000000000..d44390ac8c13 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +public class NoopOshiSysMonitor extends OshiSysMonitor +{ + public NoopOshiSysMonitor() + { + super(); + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + return false; + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java new file mode 100644 index 000000000000..40d97b57c7ee --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.CentralProcessor.TickType; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HWDiskStore; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.hardware.NetworkIF; +import oshi.hardware.VirtualMemory; +import oshi.software.os.FileSystem; +import oshi.software.os.InternetProtocolStats; +import oshi.software.os.OSFileStore; +import oshi.software.os.OperatingSystem; + +import java.util.List; +import java.util.Map; + +/** + * SysMonitor implemented using {@link oshi} + *

+ * Following stats are emitted: + *

+ */ +public class OshiSysMonitor extends FeedDefiningMonitor +{ + + private final SystemInfo si; + private final HardwareAbstractionLayer hal; + private final OperatingSystem os; + private static final List NET_ADDRESS_BLACKLIST = ImmutableList.of("0.0.0.0", "127.0.0.1"); + private final MemStats memStats; + private final SwapStats swapStats; + private final FsStats fsStats; + private final DiskStats diskStats; + private final NetStats netStats; + private final CpuStats cpuStats; + private final SysStats sysStats; + private final TcpStats tcpStats; + + private final Map dimensions; + + public OshiSysMonitor() + { + this(ImmutableMap.of()); + } + + public OshiSysMonitor(Map dimensions) + { + this(dimensions, DEFAULT_METRICS_FEED); + } + + public OshiSysMonitor(Map dimensions, String feed) + { + super(feed); + Preconditions.checkNotNull(dimensions); + this.dimensions = ImmutableMap.copyOf(dimensions); + + this.si = new SystemInfo(); + this.hal = si.getHardware(); + this.os = si.getOperatingSystem(); + + this.memStats = new MemStats(); + this.swapStats = new SwapStats(); + this.fsStats = new FsStats(); + this.diskStats = new DiskStats(); + this.netStats = new NetStats(); + this.cpuStats = new CpuStats(); + this.sysStats = new SysStats(); + this.tcpStats = new TcpStats(); + + } + + // Create an object with mocked systemInfo for testing purposes + public OshiSysMonitor(SystemInfo systemInfo) + { + super("metrics"); + this.dimensions = ImmutableMap.of(); + + this.si = systemInfo; + this.hal = si.getHardware(); + this.os = si.getOperatingSystem(); + + this.memStats = new MemStats(); + this.swapStats = new SwapStats(); + this.fsStats = new FsStats(); + this.diskStats = new DiskStats(); + this.netStats = new NetStats(); + this.cpuStats = new CpuStats(); + this.sysStats = new SysStats(); + this.tcpStats = new TcpStats(); + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + monitorMemStats(emitter); + monitorSwapStats(emitter); + monitorFsStats(emitter); + monitorDiskStats(emitter); + monitorNetStats(emitter); + monitorCpuStats(emitter); + monitorSysStats(emitter); + monitorTcpStats(emitter); + return true; + } + + // Emit stats for a particular stat(mem, swap, filestore, etc) from statsList for testing + public void monitorMemStats(ServiceEmitter emitter) + { + memStats.emit(emitter); + } + + public void monitorSwapStats(ServiceEmitter emitter) + { + swapStats.emit(emitter); + } + + public void monitorFsStats(ServiceEmitter emitter) + { + fsStats.emit(emitter); + } + + public void monitorDiskStats(ServiceEmitter emitter) + { + diskStats.emit(emitter); + } + + public void monitorNetStats(ServiceEmitter emitter) + { + netStats.emit(emitter); + } + + public void monitorCpuStats(ServiceEmitter emitter) + { + cpuStats.emit(emitter); + } + + public void monitorSysStats(ServiceEmitter emitter) + { + sysStats.emit(emitter); + } + + public void monitorTcpStats(ServiceEmitter emitter) + { + tcpStats.emit(emitter); + } + + /** + * Implementation of Memstats + *

+ * Define a method {@link #emit(ServiceEmitter)} to emit metrices in emiters + */ + + private class MemStats + { + public void emit(ServiceEmitter emitter) + { + GlobalMemory mem = hal.getMemory(); + if (mem != null) { + final Map stats = ImmutableMap.of( + "sys/mem/max", + mem.getTotal(), + "sys/mem/used", + mem.getTotal() - mem.getAvailable(), + // This is total actual memory used, not including cache and buffer memory + "sys/mem/free", + mem.getAvailable() + ); + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + + private class SwapStats + { + private long prevPageIn = 0; + private long prevPageOut = 0; + + public void emit(ServiceEmitter emitter) + { + VirtualMemory swap = hal.getMemory().getVirtualMemory(); + + if (swap != null) { + long currPageIn = swap.getSwapPagesIn(); + long currPageOut = swap.getSwapPagesOut(); + final Map stats = ImmutableMap.of( + "sys/swap/pageIn", currPageIn - prevPageIn, + "sys/swap/pageOut", currPageOut - prevPageOut, + "sys/swap/max", swap.getSwapTotal(), + "sys/swap/free", swap.getSwapTotal() - swap.getSwapUsed() + ); + + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + + this.prevPageIn = currPageIn; + this.prevPageOut = currPageOut; + } + } + } + + private class FsStats + { + public void emit(ServiceEmitter emitter) + { + FileSystem fileSystem = os.getFileSystem(); + for (OSFileStore fs : fileSystem.getFileStores(true)) { // get only local file store : true + + final Map stats = ImmutableMap.builder() + .put("sys/fs/max", fs.getTotalSpace()) + .put("sys/fs/used", fs.getTotalSpace() - fs.getUsableSpace()) + .put("sys/fs/files/count", fs.getTotalInodes()) + .put("sys/fs/files/free", fs.getFreeInodes()) + .build(); + final ServiceMetricEvent.Builder builder = builder() + .setDimension("fsDevName", fs.getVolume()) + .setDimension("fsDirName", fs.getMount()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + + private class DiskStats + { + // Difference b/w metrics of two consecutive values. It tells Δmetric (increase/decrease in metrics value) + private final KeyedDiff diff = new KeyedDiff(); + + public void emit(ServiceEmitter emitter) + { + List disks = hal.getDiskStores(); + // disk partitions can be mapped to file system but no inbuilt method is there to find relation b/w disks and file system + // Will have to add logic for that + for (HWDiskStore disk : disks) { + + final Map stats = diff.to( + disk.getName(), + ImmutableMap.builder() + .put("sys/disk/read/size", disk.getReadBytes()) + .put("sys/disk/read/count", disk.getReads()) + .put("sys/disk/write/size", disk.getWriteBytes()) + .put("sys/disk/write/count", disk.getWrites()) + .put("sys/disk/queue", disk.getCurrentQueueLength()) + .put("sys/disk/transferTime", disk.getTransferTime()) + .build() + ); + if (stats != null) { + final ServiceMetricEvent.Builder builder = builder() + .setDimension("diskName", disk.getName()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + } + + private class NetStats + { + private final KeyedDiff diff = new KeyedDiff(); + + public void emit(ServiceEmitter emitter) + { + List networkIFS = hal.getNetworkIFs(); + for (NetworkIF net : networkIFS) { + final String name = net.getName(); + for (String addr : net.getIPv4addr()) { + if (!NET_ADDRESS_BLACKLIST.contains(addr)) { + // Only emit metrics for non black-listed ip addresses + String mapKey = name + + "_" + + addr; // Network_Name_IPV4 address as key, ex: wifi_192.1.0.1 to uniquely identify the dimension + final Map stats = diff.to( + mapKey, + ImmutableMap.builder() + .put("sys/net/read/size", net.getBytesRecv()) + .put("sys/net/read/packets", net.getPacketsRecv()) + .put("sys/net/read/errors", net.getInErrors()) + .put("sys/net/read/dropped", net.getInDrops()) + .put("sys/net/write/size", net.getBytesSent()) + .put("sys/net/write/packets", net.getPacketsSent()) + .put("sys/net/write/errors", net.getOutErrors()) + .put("sys/net/write/collisions", net.getCollisions()) + .build() + ); + if (stats != null) { + final ServiceMetricEvent.Builder builder = builder() + .setDimension("netName", net.getName()) + .setDimension("netAddress", addr) + .setDimension("netHwaddr", net.getMacaddr()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + } + } + } + + private class CpuStats + { + private final KeyedDiff diff = new KeyedDiff(); + + + public void emit(ServiceEmitter emitter) + { + CentralProcessor processor = hal.getProcessor(); + long[][] procTicks = processor.getProcessorCpuLoadTicks(); + for (int i = 0; i < procTicks.length; ++i) { + final String name = Integer.toString(i); + long[] ticks = procTicks[i]; + long user = ticks[TickType.USER.getIndex()]; + long nice = ticks[TickType.NICE.getIndex()]; + long sys = ticks[TickType.SYSTEM.getIndex()]; + long idle = ticks[TickType.IDLE.getIndex()]; + long iowait = ticks[TickType.IOWAIT.getIndex()]; + long irq = ticks[TickType.IRQ.getIndex()]; + long softirq = ticks[TickType.SOFTIRQ.getIndex()]; + long steal = ticks[TickType.STEAL.getIndex()]; + long totalCpu = user + nice + sys + idle + iowait + irq + softirq + steal; + final Map stats = diff.to( + name, + ImmutableMap.builder() + .put("user", user) // user = Δuser / Δtotal + .put("sys", sys) // sys = Δsys / Δtotal + .put("nice", nice) // nice = Δnice / Δtotal + .put("wait", iowait) // wait = Δwait / Δtotal + .put("irq", irq) // irq = Δirq / Δtotal + .put("softIrq", softirq) // softIrq = ΔsoftIrq / Δtotal + .put("stolen", steal) // stolen = Δstolen / Δtotal + .put("idle", idle) // idle = Δidle / Δtotal + .put("_total", totalCpu) // (not reported) + .build() + ); + if (stats != null) { + final long total = stats.remove("_total"); + for (Map.Entry entry : stats.entrySet()) { + final ServiceMetricEvent.Builder builder = builder() + .setDimension("cpuName", name) + .setDimension("cpuTime", entry.getKey()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + if (total != 0) { + // prevent divide by 0 exception and don't emit such events + emitter.emit(builder.build("sys/cpu", entry.getValue() * 100 / total)); // [0,100] + } + + } + } + } + } + } + + private class SysStats + { + + public void emit(ServiceEmitter emitter) + { + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + + long uptime = os.getSystemUptime(); + + final Map stats = ImmutableMap.of( + "sys/uptime", uptime + ); + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + CentralProcessor processor = hal.getProcessor(); + double[] la = processor.getSystemLoadAverage(3); + + if (la != null) { + final Map statsCpuLoadAverage = ImmutableMap.of( + "sys/la/1", la[0], + "sys/la/5", la[1], + "sys/la/15", la[2] + ); + for (Map.Entry entry : statsCpuLoadAverage.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + + private class TcpStats + { + private final KeyedDiff diff = new KeyedDiff(); + + public void emit(ServiceEmitter emitter) + { + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + + InternetProtocolStats ipstats = os.getInternetProtocolStats(); + InternetProtocolStats.TcpStats tcpv4 = ipstats.getTCPv4Stats(); + + if (tcpv4 != null) { + final Map stats = diff.to( + "tcpv4", ImmutableMap.builder() + .put("sys/tcpv4/activeOpens", tcpv4.getConnectionsActive()) + .put("sys/tcpv4/passiveOpens", tcpv4.getConnectionsPassive()) + .put("sys/tcpv4/attemptFails", tcpv4.getConnectionFailures()) + .put("sys/tcpv4/estabResets", tcpv4.getConnectionsReset()) + .put("sys/tcpv4/in/segs", tcpv4.getSegmentsReceived()) + .put("sys/tcpv4/in/errs", tcpv4.getInErrors()) + .put("sys/tcpv4/out/segs", tcpv4.getSegmentsSent()) + .put("sys/tcpv4/out/rsts", tcpv4.getOutResets()) + .put("sys/tcpv4/retrans/segs", tcpv4.getSegmentsRetransmitted()) + .build() + ); + if (stats != null) { + for (Map.Entry entry : stats.entrySet()) { + emitter.emit(builder.build(entry.getKey(), entry.getValue())); + } + } + } + } + } + +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java index d161bc1ad055..c8ce4cfb90b7 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java @@ -46,6 +46,14 @@ import java.util.List; import java.util.Map; + +/** + * Deprecated, SysMonitor will now be maintained in {@link OshiSysMonitor} + * + * Sys monitor was implemented using @link org.hyperic.sigar which is no longer maintained. + * {@link oshi} based SysMonitor will be maintained and used from now on, and is implemented in org.apache.druid.java.util.metrics.OshiSysMonitor + */ +@Deprecated public class SysMonitor extends FeedDefiningMonitor { private static final Logger log = new Logger(SysMonitor.class); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitorTest.java new file mode 100644 index 000000000000..d07f160e218b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitorTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class NoopOshiSysMonitorTest +{ + @Test + public void testDoMonitor() + { + + ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class); + NoopOshiSysMonitor noopOshiSysMonitor = new NoopOshiSysMonitor(); + + Assert.assertFalse(noopOshiSysMonitor.doMonitor(serviceEmitter)); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java new file mode 100644 index 000000000000..3b86b1efd765 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HWDiskStore; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.hardware.NetworkIF; +import oshi.hardware.VirtualMemory; +import oshi.software.os.FileSystem; +import oshi.software.os.InternetProtocolStats; +import oshi.software.os.OSFileStore; +import oshi.software.os.OperatingSystem; +import oshi.util.Util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OshiSysMonitorTest +{ + + private SystemInfo si; + private HardwareAbstractionLayer hal; + private OperatingSystem os; + + private enum STATS + { + MEM, SWAP, FS, DISK, NET, CPU, SYS, TCP + } + + @Before + public void setUp() + { + si = Mockito.mock(SystemInfo.class); + hal = Mockito.mock(HardwareAbstractionLayer.class); + os = Mockito.mock(OperatingSystem.class); + Mockito.when(si.getHardware()).thenReturn(hal); + Mockito.when(si.getOperatingSystem()).thenReturn(os); + } + + @Test + public void testDoMonitor() + { + + ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class); + OshiSysMonitor sysMonitorOshi = new OshiSysMonitor(); + serviceEmitter.start(); + sysMonitorOshi.monitor(serviceEmitter); + + Assert.assertTrue(sysMonitorOshi.doMonitor(serviceEmitter)); + + } + + @Test + public void testDefaultFeedSysMonitorOshi() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + OshiSysMonitor m = new OshiSysMonitor(); + m.start(); + m.monitor(emitter); + // Sleep for 2 sec to get all metrics which are difference of prev and now metrics + Util.sleep(2000); + m.monitor(emitter); + m.stop(); + checkEvents(emitter.getEvents(), "metrics"); + } + + @Test + public void testMemStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + GlobalMemory mem = Mockito.mock(GlobalMemory.class); + Mockito.when(mem.getTotal()).thenReturn(64L); + Mockito.when(mem.getAvailable()).thenReturn(16L); + Mockito.when(hal.getMemory()).thenReturn(mem); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorMemStats(emitter); + m.stop(); + Assert.assertEquals(3, emitter.getEvents().size()); + emitter.verifyEmitted("sys/mem/max", 1); + emitter.verifyEmitted("sys/mem/used", 1); + emitter.verifyEmitted("sys/mem/free", 1); + emitter.verifyValue("sys/mem/max", 64L); + emitter.verifyValue("sys/mem/used", 48L); + emitter.verifyValue("sys/mem/free", 16L); + } + + @Test + public void testSwapStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + GlobalMemory mem = Mockito.mock(GlobalMemory.class); + VirtualMemory swap = Mockito.mock(VirtualMemory.class); + Mockito.when(swap.getSwapPagesIn()).thenReturn(300L); + Mockito.when(swap.getSwapPagesOut()).thenReturn(200L); + Mockito.when(swap.getSwapTotal()).thenReturn(1000L); + Mockito.when(swap.getSwapUsed()).thenReturn(700L); + Mockito.when(mem.getVirtualMemory()).thenReturn(swap); + Mockito.when(hal.getMemory()).thenReturn(mem); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorSwapStats(emitter); + Assert.assertEquals(4, emitter.getEvents().size()); + emitter.verifyEmitted("sys/swap/pageIn", 1); + emitter.verifyEmitted("sys/swap/pageOut", 1); + emitter.verifyEmitted("sys/swap/max", 1); + emitter.verifyEmitted("sys/swap/free", 1); + emitter.verifyValue("sys/swap/pageIn", 300L); + emitter.verifyValue("sys/swap/pageOut", 200L); + emitter.verifyValue("sys/swap/max", 1000L); + emitter.verifyValue("sys/swap/free", 300L); + // Emit again to assert diff in pageIn stats + Mockito.when(swap.getSwapPagesIn()).thenReturn(400L); + Mockito.when(swap.getSwapPagesOut()).thenReturn(250L); + Mockito.when(swap.getSwapUsed()).thenReturn(500L); + emitter.flush(); + m.monitorSwapStats(emitter); + emitter.verifyValue("sys/swap/pageIn", 100L); + emitter.verifyValue("sys/swap/pageOut", 50L); + emitter.verifyValue("sys/swap/max", 1000L); + emitter.verifyValue("sys/swap/free", 500L); + m.stop(); + } + + @Test + public void testFsStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + FileSystem fileSystem = Mockito.mock(FileSystem.class); + OSFileStore fs1 = Mockito.mock(OSFileStore.class); + OSFileStore fs2 = Mockito.mock(OSFileStore.class); + Mockito.when(fs1.getTotalSpace()).thenReturn(300L); + Mockito.when(fs1.getUsableSpace()).thenReturn(200L); + Mockito.when(fs1.getTotalInodes()).thenReturn(1000L); + Mockito.when(fs1.getFreeInodes()).thenReturn(700L); + Mockito.when(fs1.getVolume()).thenReturn("/dev/disk1"); + Mockito.when(fs1.getMount()).thenReturn("/System/Volumes/boot1"); + Mockito.when(fs2.getTotalSpace()).thenReturn(400L); + Mockito.when(fs2.getUsableSpace()).thenReturn(320L); + Mockito.when(fs2.getTotalInodes()).thenReturn(800L); + Mockito.when(fs2.getFreeInodes()).thenReturn(600L); + Mockito.when(fs2.getVolume()).thenReturn("/dev/disk2"); + Mockito.when(fs2.getMount()).thenReturn("/System/Volumes/boot2"); + List osFileStores = ImmutableList.of(fs1, fs2); + Mockito.when(fileSystem.getFileStores(true)).thenReturn(osFileStores); + Mockito.when(os.getFileSystem()).thenReturn(fileSystem); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorFsStats(emitter); + Assert.assertEquals(8, emitter.getEvents().size()); + emitter.verifyEmitted("sys/fs/max", 2); + emitter.verifyEmitted("sys/fs/used", 2); + emitter.verifyEmitted("sys/fs/files/count", 2); + emitter.verifyEmitted("sys/fs/files/free", 2); + Map userDims1 = ImmutableMap.of( + "fsDevName", + "/dev/disk1", + "fsDirName", + "/System/Volumes/boot1" + ); + List metricValues1 = emitter.getMetricValues("sys/fs/max", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(300L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/fs/used", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/fs/files/count", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(1000L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/fs/files/free", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(700L, metricValues1.get(0)); + + Map userDims2 = ImmutableMap.of( + "fsDevName", + "/dev/disk2", + "fsDirName", + "/System/Volumes/boot2" + ); + List metricValues2 = emitter.getMetricValues("sys/fs/max", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(400L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/fs/used", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(80L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/fs/files/count", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(800L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/fs/files/free", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(600L, metricValues2.get(0)); + m.stop(); + } + + @Test + public void testDiskStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + HWDiskStore disk1 = Mockito.mock(HWDiskStore.class); + HWDiskStore disk2 = Mockito.mock(HWDiskStore.class); + Mockito.when(disk1.getReadBytes()).thenReturn(300L); + Mockito.when(disk1.getReads()).thenReturn(200L); + Mockito.when(disk1.getWriteBytes()).thenReturn(400L); + Mockito.when(disk1.getWrites()).thenReturn(500L); + Mockito.when(disk1.getCurrentQueueLength()).thenReturn(100L); + Mockito.when(disk1.getTransferTime()).thenReturn(150L); + Mockito.when(disk1.getName()).thenReturn("disk1"); + Mockito.when(disk2.getReadBytes()).thenReturn(2000L); + Mockito.when(disk2.getReads()).thenReturn(3000L); + Mockito.when(disk2.getWriteBytes()).thenReturn(1000L); + Mockito.when(disk2.getWrites()).thenReturn(4000L); + Mockito.when(disk2.getCurrentQueueLength()).thenReturn(750L); + Mockito.when(disk2.getTransferTime()).thenReturn(800L); + Mockito.when(disk2.getName()).thenReturn("disk2"); + List hwDiskStores = ImmutableList.of(disk1, disk2); + Mockito.when(hal.getDiskStores()).thenReturn(hwDiskStores); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorDiskStats(emitter); + Assert.assertEquals(0, emitter.getEvents().size()); + + Mockito.when(disk1.getReadBytes()).thenReturn(400L); + Mockito.when(disk1.getReads()).thenReturn(220L); + Mockito.when(disk1.getWriteBytes()).thenReturn(600L); + Mockito.when(disk1.getWrites()).thenReturn(580L); + Mockito.when(disk1.getCurrentQueueLength()).thenReturn(300L); + Mockito.when(disk1.getTransferTime()).thenReturn(250L); + Mockito.when(disk2.getReadBytes()).thenReturn(4500L); + Mockito.when(disk2.getReads()).thenReturn(3500L); + Mockito.when(disk2.getWriteBytes()).thenReturn(2300L); + Mockito.when(disk2.getWrites()).thenReturn(5000L); + Mockito.when(disk2.getCurrentQueueLength()).thenReturn(900L); + Mockito.when(disk2.getTransferTime()).thenReturn(1100L); + + m.monitorDiskStats(emitter); + Assert.assertEquals(12, emitter.getEvents().size()); + + Map userDims1 = ImmutableMap.of( + "diskName", + "disk1" + ); + List metricValues1 = emitter.getMetricValues("sys/disk/read/size", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/disk/read/count", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(20L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/disk/write/size", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/disk/write/count", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(80L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/disk/queue", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/disk/transferTime", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + + Map userDims2 = ImmutableMap.of( + "diskName", + "disk2" + ); + List metricValues2 = emitter.getMetricValues("sys/disk/read/size", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(2500L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/disk/read/count", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(500L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/disk/write/size", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(1300L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/disk/write/count", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(1000L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/disk/queue", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(150L, metricValues2.get(0)); + metricValues2 = emitter.getMetricValues("sys/disk/transferTime", userDims2); + Assert.assertEquals(1, metricValues2.size()); + Assert.assertEquals(300L, metricValues2.get(0)); + + m.stop(); + } + + @Test + public void testNetStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + NetworkIF net1 = Mockito.mock(NetworkIF.class); + Mockito.when(net1.getBytesRecv()).thenReturn(300L); + Mockito.when(net1.getPacketsRecv()).thenReturn(200L); + Mockito.when(net1.getInErrors()).thenReturn(400L); + Mockito.when(net1.getInDrops()).thenReturn(500L); + Mockito.when(net1.getBytesSent()).thenReturn(100L); + Mockito.when(net1.getPacketsSent()).thenReturn(150L); + Mockito.when(net1.getOutErrors()).thenReturn(200L); + Mockito.when(net1.getCollisions()).thenReturn(20L); + Mockito.when(net1.getName()).thenReturn("Wifi"); + Mockito.when(net1.getIPv4addr()).thenReturn(new String[]{"123.456.7.8", "0.0.0.0", "192.1.2.3"}); + Mockito.when(net1.getMacaddr()).thenReturn("ha:rd:wa:re:add"); + + List networkIFS = ImmutableList.of(net1); + Mockito.when(hal.getNetworkIFs()).thenReturn(networkIFS); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorNetStats(emitter); + Assert.assertEquals(0, emitter.getEvents().size()); + + Mockito.when(net1.getBytesRecv()).thenReturn(400L); + Mockito.when(net1.getPacketsRecv()).thenReturn(220L); + Mockito.when(net1.getInErrors()).thenReturn(600L); + Mockito.when(net1.getInDrops()).thenReturn(580L); + Mockito.when(net1.getBytesSent()).thenReturn(300L); + Mockito.when(net1.getPacketsSent()).thenReturn(250L); + Mockito.when(net1.getOutErrors()).thenReturn(330L); + Mockito.when(net1.getCollisions()).thenReturn(240L); + + + m.monitorNetStats(emitter); + Assert.assertEquals(16, emitter.getEvents().size()); // 8 * 2 whitelisted ips + + Map userDims1 = ImmutableMap.of( + "netName", + "Wifi", + "netAddress", + "123.456.7.8", + "netHwaddr", + "ha:rd:wa:re:add" + ); + Map userDims2 = ImmutableMap.of( + "netName", + "Wifi", + "netAddress", + "192.1.2.3", + "netHwaddr", + "ha:rd:wa:re:add" + ); + List metricValues1 = emitter.getMetricValues("sys/net/read/size", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/packets", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(20L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/errors", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/dropped", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(80L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/size", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/packets", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/errors", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(130L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/collisions", userDims1); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(220L, metricValues1.get(0)); + + metricValues1 = emitter.getMetricValues("sys/net/read/size", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/packets", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(20L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/errors", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/read/dropped", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(80L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/size", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(200L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/packets", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(100L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/errors", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(130L, metricValues1.get(0)); + metricValues1 = emitter.getMetricValues("sys/net/write/collisions", userDims2); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(220L, metricValues1.get(0)); + m.stop(); + } + + @Test + public void testCpuStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + CentralProcessor processor = Mockito.mock(CentralProcessor.class); + long[][] procTicks = new long[][]{ + {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L}, + {2L, 4L, 6L, 8L, 10L, 12L, 14L, 16L}, + }; + Mockito.when(processor.getProcessorCpuLoadTicks()).thenReturn(procTicks); + Mockito.when(hal.getProcessor()).thenReturn(processor); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorCpuStats(emitter); + Assert.assertEquals(0, emitter.getEvents().size()); + + long[][] procTicks2 = new long[][]{ + {4L, 5L, 6L, 8L, 9L, 7L, 10L, 12L}, // Δtick1 {3,3,3,4,4,1,3,4} _total = 25, emitted percentage + {5L, 8L, 8L, 10L, 15L, 14L, 18L, 22L}, // Δtick2 {3,4,2,2,5,2,4,6} _total = 28 + }; + Mockito.when(processor.getProcessorCpuLoadTicks()).thenReturn(procTicks2); + + m.monitorCpuStats(emitter); + m.stop(); + Assert.assertEquals(16, emitter.getEvents().size()); // 8 ticktype * 2 processors + + Map userDims = new HashMap(); + userDims.put("cpuName", "0"); + userDims.put("cpuTime", "user"); + List metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(12L, metricValues1.get(0)); + userDims.replace("cpuTime", "nice"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(12L, metricValues1.get(0)); + userDims.replace("cpuTime", "sys"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(12L, metricValues1.get(0)); + userDims.replace("cpuTime", "idle"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(16L, metricValues1.get(0)); + userDims.replace("cpuTime", "wait"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(16L, metricValues1.get(0)); + userDims.replace("cpuTime", "irq"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(4L, metricValues1.get(0)); + userDims.replace("cpuTime", "softIrq"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(12L, metricValues1.get(0)); + userDims.replace("cpuTime", "stolen"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(16L, metricValues1.get(0)); + + userDims.replace("cpuName", "1"); + userDims.replace("cpuTime", "user"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(10L, metricValues1.get(0)); + userDims.replace("cpuTime", "nice"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(14L, metricValues1.get(0)); + userDims.replace("cpuTime", "sys"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(7L, metricValues1.get(0)); + userDims.replace("cpuTime", "idle"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(7L, metricValues1.get(0)); + userDims.replace("cpuTime", "wait"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(17L, metricValues1.get(0)); + userDims.replace("cpuTime", "irq"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(7L, metricValues1.get(0)); + userDims.replace("cpuTime", "softIrq"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(14L, metricValues1.get(0)); + userDims.replace("cpuTime", "stolen"); + metricValues1 = emitter.getMetricValues("sys/cpu", userDims); + Assert.assertEquals(1, metricValues1.size()); + Assert.assertEquals(21L, metricValues1.get(0)); + + } + + @Test + public void testSysStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + + Mockito.when(os.getSystemUptime()).thenReturn(4000L); + CentralProcessor processor = Mockito.mock(CentralProcessor.class); + double[] la = new double[]{2.31, 4.31, 5.31}; + Mockito.when(processor.getSystemLoadAverage(3)).thenReturn(la); + Mockito.when(hal.getProcessor()).thenReturn(processor); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorSysStats(emitter); + Assert.assertEquals(4, emitter.getEvents().size()); + m.stop(); + emitter.verifyEmitted("sys/uptime", 1); + emitter.verifyEmitted("sys/la/1", 1); + emitter.verifyEmitted("sys/la/5", 1); + emitter.verifyEmitted("sys/la/15", 1); + emitter.verifyValue("sys/uptime", 4000L); + emitter.verifyValue("sys/la/1", 2.31); + emitter.verifyValue("sys/la/5", 4.31); + emitter.verifyValue("sys/la/15", 5.31); + + } + + @Test + public void testTcpStats() + { + StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); + InternetProtocolStats.TcpStats tcpv4 = Mockito.mock(InternetProtocolStats.TcpStats.class); + InternetProtocolStats ipstats = Mockito.mock(InternetProtocolStats.class); + Mockito.when(tcpv4.getConnectionsActive()).thenReturn(10L); + Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(20L); + Mockito.when(tcpv4.getConnectionFailures()).thenReturn(5L); + Mockito.when(tcpv4.getConnectionsReset()).thenReturn(7L); + Mockito.when(tcpv4.getSegmentsReceived()).thenReturn(200L); + Mockito.when(tcpv4.getInErrors()).thenReturn(3L); + Mockito.when(tcpv4.getSegmentsSent()).thenReturn(300L); + Mockito.when(tcpv4.getOutResets()).thenReturn(4L); + Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L); + Mockito.when(ipstats.getTCPv4Stats()).thenReturn(tcpv4); + Mockito.when(os.getInternetProtocolStats()).thenReturn(ipstats); + + OshiSysMonitor m = new OshiSysMonitor(si); + m.start(); + m.monitorTcpStats(emitter); + + Assert.assertEquals(0, emitter.getEvents().size()); + Mockito.when(tcpv4.getConnectionsActive()).thenReturn(20L); + Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(25L); + Mockito.when(tcpv4.getConnectionFailures()).thenReturn(8L); + Mockito.when(tcpv4.getConnectionsReset()).thenReturn(14L); + Mockito.when(tcpv4.getSegmentsReceived()).thenReturn(350L); + Mockito.when(tcpv4.getInErrors()).thenReturn(4L); + Mockito.when(tcpv4.getSegmentsSent()).thenReturn(500L); + Mockito.when(tcpv4.getOutResets()).thenReturn(7L); + Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L); + m.monitorTcpStats(emitter); + m.stop(); + Assert.assertEquals(9, emitter.getEvents().size()); + emitter.verifyValue("sys/tcpv4/activeOpens", 10L); + emitter.verifyValue("sys/tcpv4/passiveOpens", 5L); + emitter.verifyValue("sys/tcpv4/attemptFails", 3L); + emitter.verifyValue("sys/tcpv4/estabResets", 7L); + emitter.verifyValue("sys/tcpv4/in/segs", 150L); + emitter.verifyValue("sys/tcpv4/in/errs", 1L); + emitter.verifyValue("sys/tcpv4/out/segs", 200L); + emitter.verifyValue("sys/tcpv4/out/rsts", 3L); + emitter.verifyValue("sys/tcpv4/retrans/segs", 0L); + + } + + private void checkEvents(List events, String expectedFeed) + { + Assert.assertFalse("no events emitted", events.isEmpty()); + for (Event e : events) { + if (!expectedFeed.equals(e.getFeed())) { + String message = StringUtils.format("\"feed\" in event: %s", e.toMap().toString()); + Assert.assertEquals(message, expectedFeed, e.getFeed()); + } + } + } + + +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index f060ec517941..46c0fc90d89d 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -46,7 +46,9 @@ import org.apache.druid.java.util.metrics.JvmThreadsMonitor; import org.apache.druid.java.util.metrics.Monitor; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.java.util.metrics.NoopOshiSysMonitor; import org.apache.druid.java.util.metrics.NoopSysMonitor; +import org.apache.druid.java.util.metrics.OshiSysMonitor; import org.apache.druid.java.util.metrics.SysMonitor; import org.apache.druid.query.ExecutorServiceMonitor; @@ -192,4 +194,19 @@ public SysMonitor getSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @ return new SysMonitor(dimensions); } } + + @Provides + @ManageLifecycle + public OshiSysMonitor getOshiSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @Self Set nodeRoles) + { + if (nodeRoles.contains(NodeRole.PEON)) { + return new NoopOshiSysMonitor(); + } else { + Map dimensions = MonitorsConfig.mapOfDatasourceAndTaskID( + dataSourceTaskIdHolder.getDataSource(), + dataSourceTaskIdHolder.getTaskId() + ); + return new OshiSysMonitor(dimensions); + } + } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java index 473b549b51df..74899716862a 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java @@ -44,7 +44,9 @@ import org.apache.druid.java.util.metrics.BasicMonitorScheduler; import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.java.util.metrics.NoopOshiSysMonitor; import org.apache.druid.java.util.metrics.NoopSysMonitor; +import org.apache.druid.java.util.metrics.OshiSysMonitor; import org.apache.druid.java.util.metrics.SysMonitor; import org.apache.druid.server.DruidNode; import org.hamcrest.CoreMatchers; @@ -198,6 +200,30 @@ public void testGetSysMonitorWhenNull() Assert.assertFalse(sysMonitor instanceof NoopSysMonitor); Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); } + @Test + public void testGetOshiSysMonitorViaInjector() + { + + final Injector injector = createInjector(new Properties(), ImmutableSet.of(NodeRole.PEON)); + final OshiSysMonitor sysMonitor = injector.getInstance(OshiSysMonitor.class); + final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); + sysMonitor.doMonitor(emitter); + + Assert.assertTrue(sysMonitor instanceof NoopOshiSysMonitor); + Mockito.verify(emitter, Mockito.never()).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } + @Test + public void testGetOshiSysMonitorWhenNull() + { + + Injector injector = createInjector(new Properties(), ImmutableSet.of()); + final OshiSysMonitor sysMonitor = injector.getInstance(OshiSysMonitor.class); + final ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); + sysMonitor.doMonitor(emitter); + + Assert.assertFalse(sysMonitor instanceof NoopOshiSysMonitor); + Mockito.verify(emitter, Mockito.atLeastOnce()).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } private static Injector createInjector(Properties properties, ImmutableSet nodeRoles) {