Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Dec 11, 2023
1 parent bd24fe1 commit f6a521f
Show file tree
Hide file tree
Showing 23 changed files with 610 additions and 738 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ statement
(withRemoteStorageSystem)?
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #load
| LOAD LABEL lableName=identifier
LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
resourceDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #resourceLoad
| LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #mysqlLoad
Expand Down Expand Up @@ -125,7 +120,7 @@ dataDesc
(PARTITION partition=identifierList)?
(COLUMNS TERMINATED BY comma=STRING_LITERAL)?
(LINES TERMINATED BY separator=STRING_LITERAL)?
(FORMAT AS format=identifier)?
(FORMAT AS format=identifierOrStringLiteral)?
(columns=identifierList)?
(columnsFromPath=colFromPath)?
(columnMapping=colMappingList)?
Expand Down Expand Up @@ -165,6 +160,11 @@ refreshMethod
: COMPLETE
;

identifierOrStringLiteral
: identifier
| STRING_LITERAL
;

identifierOrText
: errorCapturingIdentifier
| STRING_LITERAL
Expand Down Expand Up @@ -222,7 +222,8 @@ mappingExpr
;

withRemoteStorageSystem
: WITH S3 LEFT_PAREN
: resourceDesc
| WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH HDFS LEFT_PAREN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,8 @@ public void analyze(Analyzer analyzer) throws UserException {
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();

if (null != onceJobStartTimestamp) {
Expand All @@ -135,18 +133,17 @@ public void analyze(Analyzer analyzer) throws UserException {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);

job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.setJobId(Env.getCurrentEnv().getNextId());
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);

InsertJob job = new InsertJob(Env.getCurrentEnv().getNextId(),
labelName.getLabelName(),
JobStatus.RUNNING,
labelName.getDbName(),
comment,
ConnectContext.get().getCurrentUserIdentity(),
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
//job.checkJobParams();
jobInstance = job;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
@SerializedName(value = "sql")
String executeSql;

public AbstractJob() {}

public AbstractJob(Long id) {
setJobId(id);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, null, null, null);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
this.currentDbName = currentDbName;
this.comment = comment;
this.createUser = createUser;
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}

private List<T> runningTasks = new ArrayList<>();

@Override
Expand Down Expand Up @@ -109,7 +144,7 @@ public void cancelTaskById(long taskId) throws JobException {
.orElseThrow(() -> new JobException("no task id:" + taskId)).cancel();
}

public void initTasks(List<? extends AbstractTask> tasks) {
public void initTasks(Collection<? extends T> tasks) {
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(getNextId());
Expand All @@ -119,7 +154,7 @@ public void initTasks(List<? extends AbstractTask> tasks) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
setRunningTasks(new ArrayList<>());
}
getRunningTasks().addAll((Collection<? extends T>) tasks);
getRunningTasks().addAll(tasks);
}

public void checkJobParams() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum JobExecuteType {
*/
MANUAL,
/**
* The job will be executed immediately.
* The job will be executed only once and immediately.
*/
INSTANT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,

/**
* When the task is finished, the finished state will be triggered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@
public enum JobType {
INSERT,
MV,
LOAD
}
Loading

0 comments on commit f6a521f

Please sign in to comment.