Skip to content

Commit

Permalink
[Feature] cancel load support state (apache#9537)
Browse files Browse the repository at this point in the history
  • Loading branch information
stalary authored and minghong.zhou committed May 23, 2022
1 parent fdf73a6 commit 943d529
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 427 deletions.
165 changes: 91 additions & 74 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,105 @@
import org.apache.doris.common.UserException;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import lombok.Getter;

// CANCEL LOAD statement used to cancel load job.
//
// syntax:
// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
import java.util.List;


/**
* CANCEL LOAD statement used to cancel load job.
* syntax:
* CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
**/
public class CancelLoadStmt extends DdlStmt {

private static final List<String> SUPPORT_COLUMNS = Lists.newArrayList("label", "state");

@Getter
private String dbName;

@Getter
private CompoundPredicate.Operator operator;

@Getter
private String label;

@Getter
private String state;

private Expr whereClause;
private boolean isAccurateMatch;

public String getDbName() {
return dbName;
public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
}

public String getLabel() {
return label;
private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
throw new AnalysisException("Current not support " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("Value must is string");
}

String inputValue = expr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(inputValue)) {
throw new AnalysisException("Value can't is null");
}
if (like && !inputValue.contains("%")) {
inputValue = "%" + inputValue + "%";
}
if (inputCol.equalsIgnoreCase("label")) {
label = inputValue;
}
if (inputCol.equalsIgnoreCase("state")) {
if (like) {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
}
}

public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.isAccurateMatch = false;
private void likeCheck(Expr expr) throws AnalysisException {
if (expr instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
}
}

public boolean isAccurateMatch() {
return isAccurateMatch;
private void binaryCheck(Expr expr) throws AnalysisException {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
}
}

private void compoundCheck(Expr expr) throws AnalysisException {
if (expr == null) {
throw new AnalysisException("Where clause can't is null");
}
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current only support label and state");
}
likeCheck(child);
binaryCheck(child);
}
operator = compoundPredicate.getOp();
}
}

@Override
Expand All @@ -67,75 +137,22 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
}

// check auth after we get real load job

// analyze expr if not null
boolean valid = true;
do {
if (whereClause == null) {
valid = false;
break;
}

if (whereClause instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
isAccurateMatch = true;
if (binaryPredicate.getOp() != Operator.EQ) {
valid = false;
break;
}
} else if (whereClause instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) whereClause;
if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
valid = false;
break;
}
} else {
valid = false;
break;
}

// left child
if (!(whereClause.getChild(0) instanceof SlotRef)) {
valid = false;
break;
}
if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) {
valid = false;
break;
}

// right child
if (!(whereClause.getChild(1) instanceof StringLiteral)) {
valid = false;
break;
}

label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
if (Strings.isNullOrEmpty(label)) {
valid = false;
break;
}
if (!isAccurateMatch && !label.contains("%")) {
label = "%" + label + "%";
}
} while (false);

if (!valid) {
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
" or LABEL LIKE \"matcher\"");
}
// analyze expr
likeCheck(whereClause);
binaryCheck(whereClause);
compoundCheck(whereClause);
}

@Override
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("CANCEL LOAD ");
if (!Strings.isNullOrEmpty(dbName)) {
stringBuilder.append("FROM " + dbName);
stringBuilder.append("FROM ").append(dbName);
}

if (whereClause != null) {
stringBuilder.append(" WHERE " + whereClause.toSql());
stringBuilder.append(" WHERE ").append(whereClause.toSql());
}
return stringBuilder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.common;

/**
* CaseSensibility Enum.
**/
public enum CaseSensibility {
CLUSTER(true),
DATABASE(true),
Expand Down
178 changes: 0 additions & 178 deletions fe/fe-core/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -1728,184 +1728,6 @@ private boolean checkMultiLabelUsed(long dbId, String label, long timestamp) thr
return false;
}

public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
return false;
}
List<LoadJob> loadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(labelValue)) {
loadJobs.addAll(labelToLoadJobs.get(labelValue));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}
if (loadJobs.isEmpty()) {
return false;
}
if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
return false;
}
return true;
} finally {
readUnlock();
}
}

public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();

// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(label)) {
matchLoadJobs.addAll(labelToLoadJobs.get(label));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}

if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}

// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
JobState state = job.getState();
return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
}).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
}
loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}

// check auth here, cause we need table info
Set<String> tableNames = Sets.newHashSet();
for (LoadJob loadJob : loadJobs) {
tableNames.addAll(loadJob.getTableNames());
}

if (tableNames.isEmpty()) {
if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}

// cancel job
for (LoadJob loadJob : loadJobs) {
List<String> failedMsg = Lists.newArrayList();
boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
if (!ok) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" +
(failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}
}

return true;
}

public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();

// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
LoadJob job;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

List<LoadJob> loadJobs = labelToLoadJobs.get(label);
if (loadJobs == null) {
throw new DdlException("Load job does not exist");
}
// only the last one should be running
job = loadJobs.get(loadJobs.size() - 1);
JobState state = job.getState();
if (state == JobState.CANCELLED) {
throw new DdlException("Load job has been cancelled");
} else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) {
throw new DdlException("Load job has been finished");
}
} finally {
readUnlock();
}

// check auth here, cause we need table info
Set<String> tableNames = job.getTableNames();
if (tableNames.isEmpty()) {
// forward compatibility
if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}

// cancel job
List<String> failedMsg = Lists.newArrayList();
if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) {
throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}

return true;
}

public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) {
return cancelLoadJob(job, cancelType, msg, null);
}
Expand Down
Loading

0 comments on commit 943d529

Please sign in to comment.