Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Oct 22, 2024
1 parent 59fec6a commit a7015a8
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTrac
0,
GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
this.fetchStats = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs)
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO");
this.fetchStats =
PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs)
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO");
if (this.fetchStats) {
logger.info("start to schedule statistics fetch task");
this.scheduler.scheduleAtFixedRate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ public void execute(
jobName,
summary.getLogicalPlan(),
summary.getPhysicalPlan());
client.submit(request, listener, timeoutConfig, statusCallback.getQueryLogger());
client.submit(
request,
listener,
timeoutConfig,
statusCallback.getQueryLogger());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public KafkaProcessor(
replayQueue = new ArrayBlockingQueue<>(queueSize);
latestSnapshotId = new AtomicLong(-1);
replayInProgress = new AtomicBoolean(false);

}

public void start() {
Expand Down Expand Up @@ -278,10 +277,10 @@ public void replayWAL() throws IOException {

int replayCount = 0;
try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) {
// replayInProgress.set(true);
// replayInProgress.set(true);
ConsumerRecord<LogEntry, LogEntry> record;
while ((record = logReader.readNextRecord()) != null) {
// writeQueue.put(new ReadLogEntry(record.offset(), record.value()));
// writeQueue.put(new ReadLogEntry(record.offset(), record.value()));
writeQueue.put(record);
replayCount++;
if (replayCount % 10000 == 0) {
Expand All @@ -292,9 +291,9 @@ public void replayWAL() throws IOException {
throw new RuntimeException(e);
}

// } finally {
// replayInProgress.set(false);
// }
// } finally {
// replayInProgress.set(false);
// }
logger.info("replayWAL finished. total replayed [{}] records", replayCount);
}

Expand All @@ -312,14 +311,16 @@ private void processRecords() {
offset = readLogEntry.getOffset();
logEntry = readLogEntry.getLogEntry();
logEntry.setSnapshotId(latestSnapshotId.get());
// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId());
// logger.info("polled from replay queue, offset {}, id {}",
// offset, logEntry.getSnapshotId());

} else {
ConsumerRecord<LogEntry, LogEntry> record = writeQueue.take();
offset = record.offset();
logEntry = record.value();
latestSnapshotId.set(logEntry.getSnapshotId());
// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get());
// logger.info("polled from write queue, offset {}, id {}",
// offset, latestSnapshotId.get());

}
processRecord(offset, logEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void processBatches() {
continue;
}
long batchSI = batch.getSnapshotId();
// logger.debug("polled one batch [" + batchSI + "]");
// logger.debug("polled one batch [" + batchSI + "]");
boolean hasDdl = writeEngineWithRetry(batch);
if (this.consumeSI < batchSI) {
SnapshotInfo availSInfo = this.availSnapshotInfoRef.get();
Expand All @@ -173,8 +173,8 @@ private void processBatches() {
this.commitExecutor.execute(this::asyncCommit);
}
// else { // a flurry of batches with same snapshot ID
// logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
//}
// logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
// }
if (hasDdl) {
this.consumeDdlSnapshotId = batchSI;
}
Expand Down Expand Up @@ -205,13 +205,13 @@ private void asyncCommit() {
}

private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) {
// while (!shouldStop) {
try {
return this.storeService.batchWrite(storeDataBatch);
} catch (Exception e) {
logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e);
}
// }
// while (!shouldStop) {
try {
return this.storeService.batchWrite(storeDataBatch);
} catch (Exception e) {
logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e);
}
// }
return false;
}

Expand Down

0 comments on commit a7015a8

Please sign in to comment.