Skip to content

Commit

Permalink
[fix](schema-change) Fix potential data race when a schema change job…
Browse files Browse the repository at this point in the history
…s is set to cancelled but the table state is still SCHEMA_CHANGE (apache#39164)

Set job cancel state after table state changed to normal.
  • Loading branch information
TangSiyang2001 committed Aug 14, 2024
1 parent 54772dc commit b402384
Showing 1 changed file with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ private void pruneMeta() {
/**
* runPendingJob():
* 1. Create all replicas of all shadow indexes and wait them finished.
* 2. After creating done, add the shadow indexes to catalog, user can not see this
* shadow index, but internal load process will generate data for these indexes.
* 2. After creating done, add the shadow indexes to catalog, user can not see
* this
* shadow index, but internal load process will generate data for these indexes.
* 3. Get a new transaction id, then set job's state to WAITING_TXN
*/
@Override
Expand Down Expand Up @@ -482,15 +483,17 @@ protected void runWaitingTxnJob() throws AlterCancelException {

this.jobState = JobState.RUNNING;

// DO NOT write edit log here, tasks will be send again if FE restart or master changed.
// DO NOT write edit log here, tasks will be send again if FE restart or master
// changed.
LOG.info("transfer schema change job {} state to {}", jobId, this.jobState);
}

/**
* runRunningJob()
* 1. Wait all schema change tasks to be finished.
* 2. Check the integrity of the newly created shadow indexes.
* 3. Replace the origin index with shadow index, and set shadow index's state as NORMAL to be visible to user.
* 3. Replace the origin index with shadow index, and set shadow index's state
* as NORMAL to be visible to user.
* 4. Set job'state as FINISHED.
*/
@Override
Expand Down Expand Up @@ -584,7 +587,7 @@ protected void runRunningJob() throws AlterCancelException {
} // end for tablets
}
} // end for partitions
// all partitions are good
// all partitions are good
onFinished(tbl);
} finally {
tbl.writeUnlock();
Expand All @@ -609,8 +612,10 @@ private void onFinished(OlapTable tbl) {
long shadowIdxId = entry.getKey();
long originIdxId = entry.getValue();
// get index from catalog, not from 'partitionIdToRollupIndex'.
// because if this alter job is recovered from edit log, index in 'partitionIndexMap'
// is not the same object in catalog. So modification on that index can not reflect to the index
// because if this alter job is recovered from edit log, index in
// 'partitionIndexMap'
// is not the same object in catalog. So modification on that index can not
// reflect to the index
// in catalog.
MaterializedIndex shadowIdx = partition.getIndex(shadowIdxId);
Preconditions.checkNotNull(shadowIdx, shadowIdxId);
Expand Down Expand Up @@ -657,7 +662,7 @@ private void onFinished(OlapTable tbl) {
tbl.getIndexMetaByIndexId(shadowIdxId).setMaxColUniqueId(maxColUniqueId);
if (LOG.isDebugEnabled()) {
LOG.debug("originIdxId:{}, shadowIdxId:{}, maxColUniqueId:{}, indexSchema:{}",
originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId));
originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId));
}

tbl.deleteIndexInfo(originIdxName);
Expand Down Expand Up @@ -704,12 +709,12 @@ protected synchronized boolean cancelImpl(String errMsg) {
pruneMeta();
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
Env.getCurrentEnv().getEditLog().logAlterJob(this);

changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId);

jobState = JobState.CANCELLED;
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
postProcessShadowIndex();
return true;
}

Expand Down Expand Up @@ -745,11 +750,10 @@ private void cancelInternal() {
}
}
}

jobState = JobState.CANCELLED;
}

// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
// Check whether transactions of the given database which txnId is less than
// 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() throws AnalysisException {
return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
watershedTxnId, dbId, Lists.newArrayList(tableId));
Expand Down Expand Up @@ -808,7 +812,8 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFound
olapTable.writeUnlock();
}

// should still be in WAITING_TXN state, so that the alter tasks will be resend again
// should still be in WAITING_TXN state, so that the alter tasks will be resend
// again
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = replayedJob.watershedTxnId;
LOG.info("replay waiting txn schema change job: {} table id: {}", jobId, tableId);
Expand Down

0 comments on commit b402384

Please sign in to comment.