Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve]Code optimization, delete some redundant code #47

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.service.DorisSinkService;
import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.utils.Version;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -38,9 +36,7 @@
/** DorisSinkTask implements SinkTask for Kafka Connect framework. */
public class DorisSinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(DorisSinkTask.class);

private DorisSinkService sink = null;
private Map<String, String> topic2table = null;

/** default constructor, invoked by kafka connect framework */
public DorisSinkTask() {}
Expand All @@ -54,8 +50,6 @@ public DorisSinkTask() {}
@Override
public void start(final Map<String, String> parsedConfig) {
LOG.info("kafka doris sink task start");
// generate topic to table map
this.topic2table = getTopicToTableMap(parsedConfig);
this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
}

Expand All @@ -76,10 +70,7 @@ public void stop() {
@Override
public void open(final Collection<TopicPartition> partitions) {
LOG.info("kafka doris sink task open with {}", partitions.toString());
partitions.forEach(
tp ->
this.sink.startTask(
ConfigCheckUtils.tableName(tp.topic(), this.topic2table), tp));
partitions.forEach(tp -> this.sink.startTask(tp));
}

/**
Expand Down Expand Up @@ -146,23 +137,4 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
public String version() {
return Version.getVersion();
}

/**
* parse topic to table map
*
* @param config connector config file
* @return result map
*/
static Map<String, String> getTopicToTableMap(Map<String, String> config) {
if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
Map<String, String> result =
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
if (result != null) {
return result;
}
LOG.error("Invalid Input, Topic2Table Map disabled");
}
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -88,7 +87,9 @@ public DorisOptions(Map<String, String> config) {
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS));

this.flushTime = Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
this.topicMap = getTopicToTableMap(config);
this.topicMap =
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));

this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
Expand Down Expand Up @@ -292,22 +293,4 @@ public String getDatabaseTimeZone() {
public boolean isEnableDelete() {
return enableDelete;
}

/**
* parse topic to table map
*
* @param config connector config file
* @return result map
*/
static Map<String, String> getTopicToTableMap(Map<String, String> config) {
if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) {
Map<String, String> result =
ConfigCheckUtils.parseTopicToTableMap(
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
if (result != null) {
return result;
}
}
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
Expand All @@ -54,15 +53,13 @@ public class DorisDefaultSinkService implements DorisSinkService {

private final ConnectionProvider conn;
private final Map<String, DorisWriter> writer;
private final Map<String, String> topic2TableMap;
private final DorisOptions dorisOptions;
private final MetricsJmxReporter metricsJmxReporter;
private final DorisConnectMonitor connectMonitor;

DorisDefaultSinkService(Map<String, String> config) {
this.dorisOptions = new DorisOptions(config);
this.writer = new HashMap<>();
this.topic2TableMap = new HashMap<>();
this.conn = new JdbcConnectionProvider(dorisOptions);
MetricRegistry metricRegistry = new MetricRegistry();
this.metricsJmxReporter = new MetricsJmxReporter(metricRegistry, dorisOptions.getName());
Expand All @@ -73,6 +70,11 @@ public class DorisDefaultSinkService implements DorisSinkService {
this.metricsJmxReporter);
}

@Override
public void startTask(TopicPartition topicPartition) {
startTask(null, topicPartition);
}

/**
* Create new task
*
Expand Down Expand Up @@ -130,9 +132,7 @@ public void insert(SinkRecord record) {
String nameIndex = getNameIndex(record.topic(), record.kafkaPartition());
// init a new topic partition
if (!writer.containsKey(nameIndex)) {
startTask(
ConfigCheckUtils.tableName(record.topic(), this.topic2TableMap),
new TopicPartition(record.topic(), record.kafkaPartition()));
startTask(new TopicPartition(record.topic(), record.kafkaPartition()));
}
writer.get(nameIndex).insert(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@

/** Background service of data sink, responsible to create/drop table and insert/delete files */
public interface DorisSinkService {

/**
* Start the Task.
*
* @param topicPartition TopicPartition passed from Kafka
*/
void startTask(TopicPartition topicPartition);

/**
* Start the Task. This should handle any configuration parsing and one-time setup of the task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public static boolean isValidDorisApplicationName(String appName) {
* @param topic2table topic to table map
* @return valid table name
*/
@Deprecated
public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}
Expand All @@ -218,6 +219,7 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @param topic2table topic to table map
* @return valid table/application name
*/
@Deprecated
public static String generateValidName(String topic, Map<String, String> topic2table) {
if (topic == null || topic.isEmpty()) {
throw new DorisException("Topic name is empty String or null");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@
public class LoadStatus {
public static final String SUCCESS = "Success";
public static final String PUBLISH_TIMEOUT = "Publish Timeout";
public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
public static final String FAIL = "Fail";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

/** util for handle response. */
public class ResponseUtil {
public static final Pattern LABEL_EXIST_PATTERN =
Pattern.compile("Label \\[(.*)\\] has already been used, relate to txn \\[(\\d+)\\]");
public static final Pattern COMMITTED_PATTERN =
Pattern.compile(
"transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
Expand Down