Skip to content

Commit

Permalink
Support for Cassandra dataStax driver 4 (#690)
Browse files Browse the repository at this point in the history
* Support for Cassandra dataStax driver 4

#202

* Exclude instrumentation for pre 4.0.0 release versions

#202

* Exclude unit tests for Java 9+

Cassandra Unit issue -
jsevellec/cassandra-unit#249
#202
  • Loading branch information
GDownes committed Feb 2, 2022
1 parent 483dc20 commit ccbd8e3
Show file tree
Hide file tree
Showing 10 changed files with 665 additions and 0 deletions.
1 change: 1 addition & 0 deletions instrumentation/cassandra-datastax-4.0.0/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
29 changes: 29 additions & 0 deletions instrumentation/cassandra-datastax-4.0.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.cassandra-datastax-4.0.0' }
}

dependencies {
implementation(project(":agent-bridge"))
implementation(project(":agent-bridge-datastore"))
implementation(project(":newrelic-api"))
implementation(project(":newrelic-weaver-api"))
implementation("com.datastax.oss:java-driver-core:4.3.1") { transitive(true) }

testImplementation 'org.cassandraunit:cassandra-unit:4.3.1.0'
testImplementation 'org.apache.cassandra:cassandra-all:3.11.5'
testImplementation 'io.netty:netty-all:4.1.35.Final'
}

verifyInstrumentation {
passesOnly 'com.datastax.oss:java-driver-core:[4.0.0,)'
excludeRegex ".*(rc|beta|alpha).*"
}

test {
forkEvery(1)
}

site {
title 'Cassandra'
type 'Datastore'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.datastax.oss.driver.internal.core.session;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.nr.agent.instrumentation.cassandra.CassandraUtils;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Weave(type = MatchType.ExactClass, originalName = "com.datastax.oss.driver.internal.core.session.DefaultSession")
public class DefaultSession_Instrumentation {
public <RequestT extends Request, ResultT> ResultT execute(RequestT request, GenericType<ResultT> resultType) {
Segment segment = null;

if (request instanceof Statement && (resultType.equals(Statement.SYNC) || resultType.equals(Statement.ASYNC)) ) {
segment = NewRelic.getAgent().getTransaction().startSegment("execute");
}

try {
Object result = Weaver.callOriginal();
if (request instanceof Statement && (resultType.equals(Statement.SYNC))) {
return (ResultT) CassandraUtils.wrapSyncRequest((Statement) request, (ResultSet) result, getKeyspace().orElse(null), segment);
} else if (request instanceof Statement && (resultType.equals(Statement.ASYNC))) {
return (ResultT) CassandraUtils.wrapAsyncRequest((Statement) request, (CompletionStage<AsyncResultSet>) result, getKeyspace().orElse(null), segment);
} else {
return (ResultT) result;
}
} catch (Exception e) {
AgentBridge.privateApi.reportException(e);
throw e;
} finally {
if(request instanceof Statement && (resultType.equals(Statement.SYNC) && segment != null)) {
segment.end();
}
}
}

public Optional<CqlIdentifier> getKeyspace() {
return Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.cassandra;

import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A very simplistic CQL "Parser" that attempts to extract the information we care about for Datastore requests:
*
* - Operation (SELECT, INSERT, UPDATE, etc)
* - Table Name (Column Family)
*/
public class CQLParser {

private static final String IDENTIFIER_REGEX = "[a-zA-Z][a-zA-Z0-9_\\.]*";
private static final int FLAGS = Pattern.DOTALL | Pattern.CASE_INSENSITIVE;

private static final Pattern SELECT_PATTERN = Pattern.compile("^(SELECT(?:\\s+JSON)?)\\s+.+?FROM\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern INSERT_PATTERN = Pattern.compile("^(INSERT)\\s+INTO\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern UPDATE_PATTERN = Pattern.compile("^(UPDATE)\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern DELETE_PATTERN = Pattern.compile("^(DELETE).+?FROM\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern BATCH_PATTERN = Pattern.compile("^BEGIN\\s+(?:(?:UNLOGGED|COUNTER)\\s+)?(BATCH)", FLAGS);
private static final Pattern USE_PATTERN = Pattern.compile("^(USE)\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern KEYSPACE_PATTERN = Pattern.compile("^([A-Za-z]+\\s+KEYSPACE)\\s+(?:IF\\s+(?:NOT\\s+)?EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern TABLE_PATTERN = Pattern.compile("^([A-Za-z]+\\s+(?:TABLE|COLUMNFAMILY))\\s+(?:IF\\s+(?:NOT\\s+)?EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern TRUNCATE_PATTERN = Pattern.compile("^(TRUNCATE)\\s+(?:(?:TABLE|COLUMNFAMILY)\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern CREATE_INDEX_PATTERN = Pattern.compile("^(CREATE\\s+(?:CUSTOM\\s+)?INDEX).+?ON\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern DROP_INDEX_PATTERN = Pattern.compile("^(DROP\\s+INDEX)\\s+(?:IF EXISTS\\s+)?(?:'|\")?(.*)", FLAGS);
private static final Pattern TYPE_PATTERN = Pattern.compile("^([A-Za-z]+\\s+TYPE)\\s+(?:IF\\s+(?:NOT\\s+)?EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern TRIGGER_PATTERN = Pattern.compile("^([A-Za-z]+\\s+TRIGGER)\\s+.*?ON\\s+(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern CREATE_FUNCTION_PATTERN = Pattern.compile("^(CREATE\\s+(?:OR\\s+REPLACE\\s+)?FUNCTION)\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(.+?)\\s", FLAGS);
private static final Pattern DROP_FUNCTION_PATTERN = Pattern.compile("^(DROP\\s+FUNCTION)\\s+(?:IF\\s+EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern CREATE_AGGREGATE_PATTERN = Pattern.compile("^(CREATE\\s+(?:OR\\s+REPLACE\\s+)?AGGREGATE)\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final Pattern DROP_AGGREGATE_PATTERN = Pattern.compile("^(DROP\\s+AGGREGATE)\\s+(?:IF\\s+EXISTS\\s+)?(?:'|\")?(" + IDENTIFIER_REGEX + ")", FLAGS);
private static final String COMMENT_PATTERN = "/\\*(?:.|[\\r\\n])*?\\*/";

private static final List<Pattern> PATTERNS = new LinkedList<>();

static {
// The order here is a performance optimization to favor more common queries first
PATTERNS.add(SELECT_PATTERN);
PATTERNS.add(UPDATE_PATTERN);
PATTERNS.add(INSERT_PATTERN);
PATTERNS.add(DELETE_PATTERN);
PATTERNS.add(BATCH_PATTERN);
PATTERNS.add(TRUNCATE_PATTERN); // This needs to be before TABLE_PATTERN
PATTERNS.add(TABLE_PATTERN);
PATTERNS.add(KEYSPACE_PATTERN);
PATTERNS.add(USE_PATTERN);
PATTERNS.add(TYPE_PATTERN);
PATTERNS.add(CREATE_INDEX_PATTERN);
PATTERNS.add(DROP_INDEX_PATTERN);
PATTERNS.add(CREATE_FUNCTION_PATTERN);
PATTERNS.add(DROP_FUNCTION_PATTERN);
PATTERNS.add(CREATE_AGGREGATE_PATTERN);
PATTERNS.add(DROP_AGGREGATE_PATTERN);
PATTERNS.add(TRIGGER_PATTERN);
}

public OperationAndTableName getOperationAndTableName(String rawQuery) {
rawQuery = rawQuery.replaceAll(COMMENT_PATTERN, "").trim();

String operation = null;
String tableName = null;
for (Pattern pattern : PATTERNS) {
Matcher matcher = pattern.matcher(rawQuery);
if (matcher.find()) {
if (matcher.groupCount() >= 1) {
operation = matcher.group(1);
}
if (matcher.groupCount() == 2) {
tableName = matcher.group(2);
}
return new OperationAndTableName(operation, tableName);
}
}
return null;
}

public class OperationAndTableName {
public final String operation;
public final String tableName;

public OperationAndTableName(String operation, String tableName) {
this.operation = operation.toUpperCase().replaceAll("\\s", "_");
if (tableName != null) {
tableName = tableName.replaceAll(";|'|\"", "");
}
this.tableName = tableName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.cassandra;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.Transaction;
import com.newrelic.agent.bridge.datastore.DatastoreVendor;
import com.newrelic.api.agent.DatastoreParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.QueryConverter;
import com.newrelic.api.agent.Segment;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.logging.Level;
import java.util.regex.Pattern;

public class CassandraUtils {

private static final String SINGLE_QUOTE = "'(?:[^']|'')*?(?:\\\\'.*|'(?!'))";
private static final String COMMENT = "(?:#|--).*?(?=\\r|\\n|$)";
private static final String MULTILINE_COMMENT = "/\\*(?:[^/]|/[^*])*?(?:\\*/|/\\*.*)";
private static final String UUID = "\\{?(?:[0-9a-f]\\-*){32}\\}?";
private static final String HEX = "0x[0-9a-f]+";
private static final String BOOLEAN = "\\b(?:true|false|null)\\b";
private static final String NUMBER = "-?\\b(?:[0-9]+\\.)?[0-9]+([eE][+-]?[0-9]+)?";

private static final Pattern CASSANDRA_DIALECT_PATTERN;
private static final Pattern CASSANDRA_UNMATCHED_PATTERN;
private static final CQLParser CASSANDRA_QUERY_PARSER = new CQLParser();

static {
String cassandraDialectPattern = String.join("|", Arrays.asList(SINGLE_QUOTE, COMMENT, MULTILINE_COMMENT, UUID, HEX, BOOLEAN, NUMBER));

CASSANDRA_DIALECT_PATTERN = Pattern.compile(cassandraDialectPattern, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
CASSANDRA_UNMATCHED_PATTERN = Pattern.compile("'|/\\*|\\*/", Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
}

public static void metrics(String queryString, String host, Integer port, String keyspace, Transaction tx,
Segment segment) {
try {
CQLParser.OperationAndTableName result = CASSANDRA_QUERY_PARSER.getOperationAndTableName(queryString);
if (result == null) {
NewRelic.getAgent().getLogger().log(Level.FINE, "Unable to parse cql statement");
return;
}

CassandraUtils.metrics(queryString, result.tableName, result.operation, host, port, keyspace, tx, segment);
} catch (Exception e) {
NewRelic.getAgent().getLogger().log(Level.FINEST, "ERROR: Problem parsing cql statement. {0}", e);
}
}

public static void metrics(String queryString, String collection, String operation, String host, Integer port,
String keyspace, Transaction tx, Segment segment) {

segment.reportAsExternal(DatastoreParameters
.product(DatastoreVendor.Cassandra.name())
.collection(collection)
.operation(operation)
.instance(host, port)
.databaseName(keyspace) // may be null, indicating no keyspace for the command
.slowQuery(queryString, CASSANDRA_QUERY_CONVERTER)
.build());
}

public static QueryConverter<String> CASSANDRA_QUERY_CONVERTER = new QueryConverter<String>() {

@Override
public String toRawQueryString(String statement) {
return statement;
}

@Override
public String toObfuscatedQueryString(String statement) {
return obfuscateQuery(statement);
}

private String obfuscateQuery(String rawQuery) {
String obfuscatedSql = CASSANDRA_DIALECT_PATTERN.matcher(rawQuery).replaceAll("?");
return checkForUnmatchedPairs(CASSANDRA_UNMATCHED_PATTERN, obfuscatedSql);
}

/**
* This method will check to see if there are any open single quotes or comment open/closes still left in the
* obfuscated string. If so, it means something didn't obfuscate properly so we will return "?" instead to
* prevent any data from leaking.
*/
private String checkForUnmatchedPairs(Pattern pattern, String obfuscatedSql) {
return pattern.matcher(obfuscatedSql).find() ? "?" : obfuscatedSql;
}
};

public static ResultSet wrapSyncRequest(Statement request, ResultSet result, CqlIdentifier keyspace, Segment segment) {
if(result != null) {
reportMetric(request, keyspace, result.getExecutionInfo().getCoordinator(), segment);
}
return result;
}

public static CompletionStage<AsyncResultSet> wrapAsyncRequest(Statement request, CompletionStage<AsyncResultSet> completionStage, CqlIdentifier keyspace, Segment segment) {
return Objects.requireNonNull(completionStage).whenComplete(
(result, throwable) -> {
if (throwable instanceof CompletionException) {
throwable = throwable.getCause();
}
if (throwable != null) {
System.out.println(throwable);
AgentBridge.privateApi.reportException(throwable);
}
if(result != null) {
reportMetric(request, keyspace, result.getExecutionInfo().getCoordinator(), segment);
}
segment.end();
});
}

private static void reportMetric(Statement request, CqlIdentifier keyspace, Node coordinator, Segment segment) {
if(request instanceof BatchStatement) {
CassandraUtils.metrics(null, null, "BATCH",
Optional.ofNullable(coordinator).flatMap(x -> x.getBroadcastAddress().map(InetSocketAddress::getHostName)).orElse(null),
Optional.ofNullable(coordinator).flatMap(x -> x.getBroadcastAddress().map(InetSocketAddress::getPort)).orElse(null),
Optional.ofNullable(keyspace).map(CqlIdentifier::asInternal).orElse(null),
AgentBridge.getAgent().getTransaction(),
segment);
} else {
CassandraUtils.metrics(
getQuery(request),
Optional.ofNullable(coordinator).flatMap(x -> x.getBroadcastAddress().map(InetSocketAddress::getHostName)).orElse(null),
Optional.ofNullable(coordinator).flatMap(x -> x.getBroadcastAddress().map(InetSocketAddress::getPort)).orElse(null),
Optional.ofNullable(keyspace).map(CqlIdentifier::asInternal).orElse(null),
AgentBridge.getAgent().getTransaction(),
segment
);
}
}

public static <RequestT extends Request> String getQuery(final RequestT statement) {
String query = null;
if (statement instanceof BoundStatement) {
query = ((BoundStatement) statement).getPreparedStatement().getQuery();
} else if (statement instanceof SimpleStatement) {
query = ((SimpleStatement) statement).getQuery();
}

return query == null ? "" : query;
}

}
Loading

0 comments on commit ccbd8e3

Please sign in to comment.