Skip to content

Commit

Permalink
[FLINK-34537] Autoscaler JDBC Support HikariPool (#785)
Browse files Browse the repository at this point in the history
* [FLINK-34537] Autoscaler JDBC Support HikariPool

* fix pom with dependency

* Remove unnecessary HikariCP dependency for flink-autoscaler-standalone module

* Polish the error message

* Address Yuepeng's comments

---------

Co-authored-by: Rui Fan <1996fanrui@gmail.com>
  • Loading branch information
czy006 and 1996fanrui committed May 16, 2024
1 parent 84d9b74 commit eb19133
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,15 @@
import org.apache.flink.autoscaler.event.LoggingEventHandler;
import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
import org.apache.flink.autoscaler.jdbc.event.JdbcEventInteractor;
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;

import java.sql.DriverManager;

import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;

/** The factory of {@link AutoScalerEventHandler}. */
public class AutoscalerEventHandlerFactory {
Expand Down Expand Up @@ -77,12 +72,7 @@ AutoScalerEventHandler<KEY, Context> create(Configuration conf) throws Exception
private static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
throws Exception {
final var jdbcUrl = conf.get(JDBC_URL);
checkArgument(jdbcUrl != null, "%s is required for jdbc event handler.", JDBC_URL.key());
var user = conf.get(JDBC_USERNAME);
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));

var conn = DriverManager.getConnection(jdbcUrl, user, password);
var conn = HikariJDBCUtil.getConnection(conf);
return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
import org.apache.flink.autoscaler.jdbc.state.JdbcStateInteractor;
import org.apache.flink.autoscaler.jdbc.state.JdbcStateStore;
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;

import java.sql.DriverManager;

import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;

/** The factory of {@link AutoScalerStateStore}. */
public class AutoscalerStateStoreFactory {
Expand Down Expand Up @@ -79,12 +74,7 @@ AutoScalerStateStore<KEY, Context> create(Configuration conf) throws Exception {
private static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf)
throws Exception {
final var jdbcUrl = conf.get(JDBC_URL);
checkArgument(jdbcUrl != null, "%s is required for jdbc state store.", JDBC_URL.key());
var user = conf.get(JDBC_USERNAME);
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));

var conn = DriverManager.getConnection(jdbcUrl, user, password);
var conn = HikariJDBCUtil.getConnection(conf);
return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.StringUtils;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

import java.sql.Connection;
import java.sql.SQLException;

import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Hikari JDBC common util. */
public class HikariJDBCUtil {

public static final String JDBC_URL_REQUIRED_HINT =
String.format(
"%s is required when jdbc state store or jdbc event handler is used.",
JDBC_URL.key());

public static Connection getConnection(Configuration conf) throws SQLException {
final var jdbcUrl = conf.get(JDBC_URL);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(jdbcUrl), JDBC_URL_REQUIRED_HINT);
var user = conf.get(JDBC_USERNAME);
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(jdbcUrl);
hikariConfig.setUsername(user);
hikariConfig.setPassword(password);
return new HikariDataSource(hikariConfig).getConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.logging.log4j:log4j-api:2.17.1
- org.apache.logging.log4j:log4j-core:2.17.1
- org.apache.logging.log4j:log4j-1.2-api:2.17.1
- com.zaxxer:HikariCP:5.1.0

This project bundles the following dependencies under the BSD License.
See bundled license files for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@

import org.apache.flink.autoscaler.event.LoggingEventHandler;
import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
import org.apache.flink.configuration.Configuration;

import org.junit.jupiter.api.Test;

import java.sql.DriverManager;
import java.sql.SQLException;

import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
import static org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil.JDBC_URL_REQUIRED_HINT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -59,25 +58,26 @@ void testCreateJdbcEventHandlerWithoutURL() {
conf.set(EVENT_HANDLER_TYPE, JDBC);
assertThatThrownBy(() -> AutoscalerEventHandlerFactory.create(conf))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("%s is required for jdbc event handler.", JDBC_URL.key());
.hasMessage(JDBC_URL_REQUIRED_HINT);
}

@Test
void testCreateJdbcEventHandler() throws Exception {
final var jdbcUrl = "jdbc:derby:memory:test";
DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close();

// Test for create JDBC Event Handler.
final var conf = new Configuration();
conf.set(EVENT_HANDLER_TYPE, JDBC);
conf.set(JDBC_URL, jdbcUrl);
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
HikariJDBCUtil.getConnection(conf).close();

var eventHandler = AutoscalerEventHandlerFactory.create(conf);
assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class);

try {
DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close();
} catch (SQLException ignored) {
conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
HikariJDBCUtil.getConnection(conf).close();
} catch (RuntimeException ignored) {
// database shutdown ignored exception
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.flink.autoscaler.standalone;

import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.configuration.Configuration;

import org.junit.jupiter.api.Test;

import java.sql.DriverManager;
import java.sql.SQLException;

import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
import static org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil.JDBC_URL_REQUIRED_HINT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -58,25 +57,26 @@ void testCreateJdbcStateStoreWithoutURL() {
conf.set(STATE_STORE_TYPE, JDBC);
assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("%s is required for jdbc state store.", JDBC_URL.key());
.hasMessage(JDBC_URL_REQUIRED_HINT);
}

@Test
void testCreateJdbcStateStore() throws Exception {
final var jdbcUrl = "jdbc:derby:memory:test";
DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close();

// Test for create JDBC State store.
final var conf = new Configuration();
conf.set(STATE_STORE_TYPE, JDBC);
conf.set(JDBC_URL, jdbcUrl);
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
HikariJDBCUtil.getConnection(conf).close();

var stateStore = AutoscalerStateStoreFactory.create(conf);
assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);

try {
DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close();
} catch (SQLException ignored) {
conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
HikariJDBCUtil.getConnection(conf).close();
} catch (RuntimeException ignored) {
// database shutdown ignored exception
}
}
}
12 changes: 12 additions & 0 deletions flink-autoscaler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ under the License.
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
<exclusions>
<exclusion>
<artifactId>HikariCP-java7</artifactId>
<groupId>com.zaxxer</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikari.version}</version>
</dependency>

<dependency>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ under the License.
<assertj.version>3.21.0</assertj.version>

<quartz.version>2.3.2</quartz.version>
<hikari.version>5.1.0</hikari.version>

<flink-kubernetes-operator.surefire.baseArgLine>-XX:+IgnoreUnrecognizedVMOptions ${surefire.module.config}</flink-kubernetes-operator.surefire.baseArgLine>

Expand Down

0 comments on commit eb19133

Please sign in to comment.