Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] cancel load support state #9537

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 91 additions & 74 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,105 @@
import org.apache.doris.common.UserException;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import lombok.Getter;

// CANCEL LOAD statement used to cancel load job.
//
// syntax:
// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
import java.util.List;


/**
* CANCEL LOAD statement used to cancel load job.
* syntax:
* CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
**/
public class CancelLoadStmt extends DdlStmt {

private static final List<String> SUPPORT_COLUMNS = Lists.newArrayList("label", "state");

@Getter
private String dbName;

@Getter
private CompoundPredicate.Operator operator;

@Getter
private String label;

@Getter
private String state;

private Expr whereClause;
private boolean isAccurateMatch;

public String getDbName() {
return dbName;
public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
}

public String getLabel() {
return label;
private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
throw new AnalysisException("Current not support " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("Value must is string");
}

String inputValue = expr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(inputValue)) {
throw new AnalysisException("Value can't is null");
}
if (like && !inputValue.contains("%")) {
inputValue = "%" + inputValue + "%";
}
if (inputCol.equalsIgnoreCase("label")) {
label = inputValue;
}
if (inputCol.equalsIgnoreCase("state")) {
if (like) {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
}
}

public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.isAccurateMatch = false;
private void likeCheck(Expr expr) throws AnalysisException {
if (expr instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
}
}

public boolean isAccurateMatch() {
return isAccurateMatch;
private void binaryCheck(Expr expr) throws AnalysisException {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
}
}

private void compoundCheck(Expr expr) throws AnalysisException {
if (expr == null) {
throw new AnalysisException("Where clause can't is null");
}
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current only support label and state");
}
likeCheck(child);
binaryCheck(child);
}
operator = compoundPredicate.getOp();
}
}

@Override
Expand All @@ -67,75 +137,22 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
}

// check auth after we get real load job

// analyze expr if not null
boolean valid = true;
do {
if (whereClause == null) {
valid = false;
break;
}

if (whereClause instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
isAccurateMatch = true;
if (binaryPredicate.getOp() != Operator.EQ) {
valid = false;
break;
}
} else if (whereClause instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) whereClause;
if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
valid = false;
break;
}
} else {
valid = false;
break;
}

// left child
if (!(whereClause.getChild(0) instanceof SlotRef)) {
valid = false;
break;
}
if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) {
valid = false;
break;
}

// right child
if (!(whereClause.getChild(1) instanceof StringLiteral)) {
valid = false;
break;
}

label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
if (Strings.isNullOrEmpty(label)) {
valid = false;
break;
}
if (!isAccurateMatch && !label.contains("%")) {
label = "%" + label + "%";
}
} while (false);

if (!valid) {
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
" or LABEL LIKE \"matcher\"");
}
// analyze expr
likeCheck(whereClause);
binaryCheck(whereClause);
compoundCheck(whereClause);
}

@Override
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("CANCEL LOAD ");
if (!Strings.isNullOrEmpty(dbName)) {
stringBuilder.append("FROM " + dbName);
stringBuilder.append("FROM ").append(dbName);
}

if (whereClause != null) {
stringBuilder.append(" WHERE " + whereClause.toSql());
stringBuilder.append(" WHERE ").append(whereClause.toSql());
}
return stringBuilder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.common;

/**
* CaseSensibility Enum.
**/
public enum CaseSensibility {
CLUSTER(true),
DATABASE(true),
Expand Down
178 changes: 0 additions & 178 deletions fe/fe-core/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -1616,184 +1616,6 @@ private boolean checkMultiLabelUsed(long dbId, String label, long timestamp) thr
return false;
}

public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
return false;
}
List<LoadJob> loadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(labelValue)) {
loadJobs.addAll(labelToLoadJobs.get(labelValue));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}
if (loadJobs.isEmpty()) {
return false;
}
if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
return false;
}
return true;
} finally {
readUnlock();
}
}

public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();

// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(label)) {
matchLoadJobs.addAll(labelToLoadJobs.get(label));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}

if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}

// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
JobState state = job.getState();
return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
}).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
}
loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}

// check auth here, cause we need table info
Set<String> tableNames = Sets.newHashSet();
for (LoadJob loadJob : loadJobs) {
tableNames.addAll(loadJob.getTableNames());
}

if (tableNames.isEmpty()) {
if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}

// cancel job
for (LoadJob loadJob : loadJobs) {
List<String> failedMsg = Lists.newArrayList();
boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
if (!ok) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" +
(failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}
}

return true;
}

public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();

// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
LoadJob job;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}

List<LoadJob> loadJobs = labelToLoadJobs.get(label);
if (loadJobs == null) {
throw new DdlException("Load job does not exist");
}
// only the last one should be running
job = loadJobs.get(loadJobs.size() - 1);
JobState state = job.getState();
if (state == JobState.CANCELLED) {
throw new DdlException("Load job has been cancelled");
} else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) {
throw new DdlException("Load job has been finished");
}
} finally {
readUnlock();
}

// check auth here, cause we need table info
Set<String> tableNames = job.getTableNames();
if (tableNames.isEmpty()) {
// forward compatibility
if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}

// cancel job
List<String> failedMsg = Lists.newArrayList();
if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) {
throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}

return true;
}

public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) {
return cancelLoadJob(job, cancelType, msg, null);
}
Expand Down
Loading