Skip to content

Commit

Permalink
[refactor](routineload)Refactored routineload to improve scalability (a…
Browse files Browse the repository at this point in the history
…pache#19834)

- The data source parameters are sunk into the specific data source class
- Simplify some code logic to reduce code complexity
- Provide a data source factory class to extract public logic
- Code that removes tests from production code. We should not include code for testing purposes in any production code.
  • Loading branch information
CalvinKirs authored and pull[bot] committed Dec 13, 2023
1 parent e6c0c63 commit 4180531
Show file tree
Hide file tree
Showing 20 changed files with 855 additions and 1,005 deletions.
9 changes: 4 additions & 5 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ nonterminal ExplainOptions opt_explain_options;
nonterminal Boolean opt_tmp;

nonterminal OutFileClause opt_outfile;
nonterminal RoutineLoadDataSourceProperties opt_datasource_properties;
nonterminal Map<String, String> opt_datasource_properties;

nonterminal Boolean opt_signed_unsigned;

Expand Down Expand Up @@ -1345,15 +1345,14 @@ alter_stmt ::=
opt_datasource_properties ::=
// empty
{:
RESULT = new RoutineLoadDataSourceProperties();
RESULT = new HashMap<String, String>();
:}
| KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
// the 3rd parameter "true" means this is for AlterRoutineLoad operation.
RESULT = new RoutineLoadDataSourceProperties(type, customProperties, true);
Map<String, String> properties = new HashMap<String, String>(customProperties);
RESULT = properties;
:}
;

quantity ::=
INTEGER_LITERAL:number
{:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.collections.MapUtils;

import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,17 +65,17 @@ public class AlterRoutineLoadStmt extends DdlStmt {

private final LabelName labelName;
private final Map<String, String> jobProperties;
private final RoutineLoadDataSourceProperties dataSourceProperties;
private final Map<String, String> dataSourceMapProperties;

// save analyzed job properties.
// analyzed data source properties are saved in dataSourceProperties.
private Map<String, String> analyzedJobProperties = Maps.newHashMap();

public AlterRoutineLoadStmt(LabelName labelName, Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
Map<String, String> dataSourceProperties) {
this.labelName = labelName;
this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap();
this.dataSourceProperties = dataSourceProperties;
this.dataSourceMapProperties = dataSourceProperties != null ? dataSourceProperties : Maps.newHashMap();
}

public String getDbName() {
Expand All @@ -88,13 +91,16 @@ public Map<String, String> getAnalyzedJobProperties() {
}

public boolean hasDataSourceProperty() {
return dataSourceProperties.hasAnalyzedProperties();
return MapUtils.isNotEmpty(dataSourceMapProperties);
}

public RoutineLoadDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
public Map<String, String> getDataSourceMapProperties() {
return dataSourceMapProperties;
}

@Getter
public AbstractDataSourceProperties dataSourceProperties;

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
Expand All @@ -106,7 +112,7 @@ public void analyze(Analyzer analyzer) throws UserException {
// check data source properties
checkDataSourceProperties();

if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) {
if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)) {
throw new AnalysisException("No properties are specified");
}
}
Expand Down Expand Up @@ -200,13 +206,15 @@ private void checkJobProperties() throws UserException {
}

private void checkDataSourceProperties() throws UserException {
if (!FeConstants.runningUnitTest) {
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.checkPrivAndGetJob(getDbName(), getLabel());
dataSourceProperties.setTimezone(job.getTimezone());
} else {
dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
if (MapUtils.isEmpty(dataSourceMapProperties)) {
return;
}
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getLabel());
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(job.getDataSourceType().name(), dataSourceMapProperties);
dataSourceProperties.setAlter(true);
dataSourceProperties.setTimezone(job.getTimezone());
dataSourceProperties.analyze();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -104,20 +105,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String NUM_AS_STRING = "num_as_string";
public static final String FUZZY_PARSE = "fuzzy_parse";

// kafka type properties
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";
public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";
public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS = "kafka_origin_default_offsets";

private static final String NAME_TYPE = "ROUTINE LOAD NAME";
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";

private AbstractDataSourceProperties dataSourceProperties;


private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add(MAX_ERROR_NUMBER_PROPERTY)
Expand All @@ -142,7 +137,6 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private final List<ParseNode> loadPropertyList;
private final Map<String, String> jobProperties;
private final String typeName;
private final RoutineLoadDataSourceProperties dataSourceProperties;

// the following variables will be initialized after analyze
// -1 as unset, the default value will set in RoutineLoadJob
Expand Down Expand Up @@ -193,7 +187,8 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties;
this.typeName = typeName.toUpperCase();
this.dataSourceProperties = new RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false);
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(typeName, dataSourceProperties);
this.mergeType = mergeType;
if (comment != null) {
this.comment = comment;
Expand Down Expand Up @@ -284,28 +279,12 @@ public String getJsonRoot() {
return jsonRoot;
}

public String getKafkaBrokerList() {
return this.dataSourceProperties.getKafkaBrokerList();
}

public String getKafkaTopic() {
return this.dataSourceProperties.getKafkaTopic();
}

public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
return this.dataSourceProperties.getKafkaPartitionOffsets();
}

public Map<String, String> getCustomKafkaProperties() {
return this.dataSourceProperties.getCustomKafkaProperties();
}

public LoadTask.MergeType getMergeType() {
return mergeType;
}

public boolean isOffsetsForTimes() {
return this.dataSourceProperties.isOffsetsForTimes();
public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}

public String getComment() {
Expand Down Expand Up @@ -474,9 +453,9 @@ private void checkJobProperties() throws UserException {
format = "json";
jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
stripOuterArray = Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
numAsString = Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
fuzzyParse = Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
} else {
throw new UserException("Format type is invalid. format=`" + format + "`");
}
Expand Down
Loading

0 comments on commit 4180531

Please sign in to comment.