Skip to content

Commit

Permalink
make exec_mem_limit valid for broker load (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
imay authored and chaoyli committed Aug 20, 2017
1 parent 9f0853b commit 57a11ef
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 25 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ class RuntimeState {

#define RETURN_IF_CANCELLED(state) \
do { \
if (UNLIKELY((state)->is_cancelled())) return Status(TStatusCode::CANCELLED); \
if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \
} while (false)

}
Expand Down
6 changes: 3 additions & 3 deletions docs/user_guide/sql_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一

- file_path,broker中的文件路径,可以指定到一个文件,也可以用/*通配符指定某个目录下的所有文件。

- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。
- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。不支持Broker方式导入

- PARTITION:如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。如果不指定,默认导入table的所有分区。

Expand Down Expand Up @@ -1291,9 +1291,9 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一

- max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。

- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false

- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false (不支持Broker方式导入)

- exe_mem_limit:在Broker Load方式时生效,指定导入执行时,后端可使用的最大内存。

举例:

Expand Down
2 changes: 2 additions & 0 deletions fe/src/com/baidu/palo/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class LoadStmt extends DdlStmt {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag";
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String CLUSTER_PROPERTY = "cluster";

// for load data from Baidu Object Store(BOS)
Expand Down Expand Up @@ -127,6 +128,7 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
propertySet.add(LoadStmt.TIMEOUT_PROPERTY);
propertySet.add(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
propertySet.add(LoadStmt.LOAD_DELETE_FLAG_PROPERTY);
propertySet.add(LoadStmt.EXEC_MEM_LIMIT);
propertySet.add(LoadStmt.CLUSTER_PROPERTY);

for (Entry<String, String> entry : properties.entrySet()) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/common/FeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ public class FeConstants {

// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_33;
public static int meta_version = FeMetaVersion.VERSION_34;
}
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/common/FeMetaVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ public final class FeMetaVersion {

// persist decommission type
public static final int VERSION_33 = 33;

// persist LoadJob's execMemLimit
public static final int VERSION_34 = 34;
}
10 changes: 10 additions & 0 deletions fe/src/com/baidu/palo/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
job.setDbId(db.getId());
job.setTimestamp(timestamp);
job.setBrokerDesc(stmt.getBrokerDesc());

// resource info
if (ConnectContext.get() != null) {
job.setResourceInfo(ConnectContext.get().toResourceCtx());
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte());
}

// job properties
Expand Down Expand Up @@ -459,6 +461,14 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
throw new DdlException("Value of delete flag is invalid");
}
}

if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
job.setExecMemLimit(Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)));
} catch (NumberFormatException e) {
throw new DdlException("Execute memory limit is not Long", e);
}
}
}

// job table load info
Expand Down
16 changes: 15 additions & 1 deletion fe/src/com/baidu/palo/load/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public enum EtlJobType {

private static final int DEFAULT_TIMEOUT_S = 0;
private static final double DEFAULT_MAX_FILTER_RATIO = 0;

private static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L; // 2GB

private long id;
private long dbId;
private String label;
Expand Down Expand Up @@ -103,6 +104,8 @@ public enum EtlJobType {

private TPriority priority;

private long execMemLimit;

public LoadJob() {
this("");
}
Expand Down Expand Up @@ -137,6 +140,7 @@ public LoadJob(String label, int timeoutSecond, double maxFilterRatio) {
this.replicaPersistInfos = Maps.newHashMap();
this.resourceInfo = null;
this.priority = TPriority.NORMAL;
this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT;
}

public long getId() {
Expand Down Expand Up @@ -275,6 +279,10 @@ public PullLoadSourceInfo getPullLoadSourceInfo() {
return pullLoadSourceInfo;
}

public void setExecMemLimit(long execMemLimit) { this.execMemLimit = execMemLimit; }

public long getExecMemLimit() { return execMemLimit; }

public void setEtlJobType(EtlJobType etlJobType) {
this.etlJobType = etlJobType;
switch (etlJobType) {
Expand Down Expand Up @@ -605,6 +613,8 @@ public void write(DataOutput out) throws IOException {
out.writeBoolean(true);
pullLoadSourceInfo.write(out);
}

out.writeLong(execMemLimit);
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -713,6 +723,10 @@ public void readFields(DataInput in) throws IOException {
this.pullLoadSourceInfo = PullLoadSourceInfo.read(in);
}
}

if (version >= FeMetaVersion.VERSION_34) {
this.execMemLimit = in.readLong();
}
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions fe/src/com/baidu/palo/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public String getTrackingUrl() {
return trackingUrl;
}

public void setExecMemoryLimit(long execMemoryLimit) {
this.queryOptions.setMem_limit(execMemoryLimit);
}

public void setTimeout(int timeout) {
this.queryOptions.setQuery_timeout(timeout);
}

// Initiate
private void prepare() {
for (PlanFragment fragment : fragments) {
Expand Down
36 changes: 20 additions & 16 deletions fe/src/com/baidu/palo/task/LoadEtlTask.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.baidu.palo.task;

import com.baidu.palo.catalog.Catalog;
Expand Down Expand Up @@ -64,6 +64,10 @@ public LoadEtlTask(LoadJob job) {
this.load = Catalog.getInstance().getLoadInstance();
}

protected String getErrorMsg() {
return "etl job fail";
}

@Override
protected void exec() {
// check job state
Expand Down Expand Up @@ -117,7 +121,7 @@ private void updateEtlStatus() throws LoadException {
processEtlFinished();
break;
case CANCELLED:
throw new LoadException("etl job fail");
throw new LoadException(getErrorMsg());
case RUNNING:
processEtlRunning();
break;
Expand Down
15 changes: 15 additions & 0 deletions fe/src/com/baidu/palo/task/PullLoadEtlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public PullLoadEtlTask(LoadJob job) {
mgr = Catalog.getInstance().getPullLoadJobMgr();
}

@Override
protected String getErrorMsg() {
String errMsg = null;
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
if (pullLoadJob != null) {
PullLoadTask failureTask = pullLoadJob.getFailureTask();
if (failureTask != null) {
if (failureTask.getExecuteStatus() != null) {
errMsg = "Broker etl failed: " + failureTask.getExecuteStatus().getErrorMsg();
}
}
}
return errMsg != null ? errMsg : super.getErrorMsg();
}

@Override
protected boolean updateJobEtlStatus() {
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/task/PullLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public synchronized void cancel() {
}
}

public PullLoadTask getFailureTask() {
return failureTask;
}

public synchronized void onTaskFinished(PullLoadTask task) {
int taskId = task.taskId;
if (!state.isRunning()) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/task/PullLoadPendingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void createEtlRequest() throws Exception {
// Generate pull load task, one
PullLoadTask task = new PullLoadTask(
job.getId(), nextTaskId, db, table,
job.getBrokerDesc(), entry.getValue(), jobDeadlineMs);
job.getBrokerDesc(), entry.getValue(), jobDeadlineMs, job.getExecMemLimit());
task.init();
pullLoadTaskList.add(task);
nextTaskId++;
Expand Down
8 changes: 6 additions & 2 deletions fe/src/com/baidu/palo/task/PullLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.baidu.palo.common.InternalException;
import com.baidu.palo.common.Status;
import com.baidu.palo.load.BrokerFileGroup;
import com.baidu.palo.load.LoadJob;
import com.baidu.palo.qe.Coordinator;
import com.baidu.palo.qe.QeProcessor;
import com.baidu.palo.thrift.TQueryType;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class PullLoadTask {
private Map<String, Long> fileMap;
private String trackingUrl;
private Map<String, String> counters;
private final long execMemLimit;

// Runtime variables
private enum State {
Expand All @@ -74,14 +76,15 @@ public PullLoadTask(
long jobId, int taskId,
Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs) {
long jobDeadlineMs, long execMemLimit) {
this.jobId = jobId;
this.taskId = taskId;
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
}

public void init() throws InternalException {
Expand Down Expand Up @@ -117,7 +120,7 @@ public synchronized boolean isFinished() {
}

public Status getExecuteStatus() {
return null;
return executeStatus;
}

public synchronized void onCancelled() {
Expand Down Expand Up @@ -201,6 +204,7 @@ public void executeOnce() throws InternalException {
curCoordinator = new Coordinator(executeId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes());
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
}

boolean needUnregister = false;
Expand Down

0 comments on commit 57a11ef

Please sign in to comment.