diff --git a/src/processor/operator/persistent/reader/rdf/rdf_scan.cpp b/src/processor/operator/persistent/reader/rdf/rdf_scan.cpp index 1ceeffde2b..2f50b7edac 100644 --- a/src/processor/operator/persistent/reader/rdf/rdf_scan.cpp +++ b/src/processor/operator/persistent/reader/rdf/rdf_scan.cpp @@ -203,6 +203,39 @@ static std::unique_ptr RdfAllTripleScanInitSharedState( bindData->config.copy(), std::move(rdfConfig), bindData->store); } +static double RdfResourceInMemScanProgressFunc(TableFuncSharedState* sharedState) { + auto rdfSharedState = + ku_dynamic_cast(sharedState); + uint64_t rtSize = + ku_dynamic_cast(rdfSharedState->store.get())->rtStore.size(); + if (rtSize == 0) { + return 0.0; + } + return static_cast(rdfSharedState->rtCursor) / rtSize; +} + +static double RdfLiteralInMemScanProgressFunc(TableFuncSharedState* sharedState) { + auto rdfSharedState = + ku_dynamic_cast(sharedState); + uint64_t ltSize = + ku_dynamic_cast(rdfSharedState->store.get())->ltStore.size(); + if (ltSize == 0) { + return 0.0; + } + return static_cast(rdfSharedState->ltCursor) / ltSize; +} + +static double RdfResourceTripleInMemScanProgressFunc(TableFuncSharedState* sharedState) { + auto rdfSharedState = + ku_dynamic_cast(sharedState); + TripleStore* store = ku_dynamic_cast(rdfSharedState->store.get()); + uint64_t size = store->rtStore.size() + store->ltStore.size(); + if (size == 0) { + return 0.0; + } + return static_cast(rdfSharedState->rtCursor + rdfSharedState->ltCursor) / size; +} + function_set RdfResourceScan::getFunctionSet() { function_set functionSet; auto func = std::make_unique(name, scanTableFunc, nullptr, @@ -247,7 +280,8 @@ function::function_set RdfAllTripleScan::getFunctionSet() { function_set RdfResourceInMemScan::getFunctionSet() { function_set functionSet; auto func = std::make_unique(name, RdfResourceInMemScanTableFunc, nullptr, - inMemScanInitSharedState, initLocalState, std::vector{}); + inMemScanInitSharedState, initLocalState, RdfResourceInMemScanProgressFunc, + std::vector{}); functionSet.push_back(std::move(func)); return functionSet; } @@ -255,7 +289,8 @@ function_set RdfResourceInMemScan::getFunctionSet() { function_set RdfLiteralInMemScan::getFunctionSet() { function_set functionSet; auto func = std::make_unique(name, RdfLiteralInMemScanTableFunc, nullptr, - inMemScanInitSharedState, initLocalState, std::vector{}); + inMemScanInitSharedState, initLocalState, RdfLiteralInMemScanProgressFunc, + std::vector{}); functionSet.push_back(std::move(func)); return functionSet; } @@ -263,7 +298,8 @@ function_set RdfLiteralInMemScan::getFunctionSet() { function_set RdfResourceTripleInMemScan::getFunctionSet() { function_set functionSet; auto func = std::make_unique(name, RdfResourceTripleInMemScanTableFunc, nullptr, - inMemScanInitSharedState, initLocalState, std::vector{}); + inMemScanInitSharedState, initLocalState, RdfResourceTripleInMemScanProgressFunc, + std::vector{}); functionSet.push_back(std::move(func)); return functionSet; } @@ -271,7 +307,8 @@ function_set RdfResourceTripleInMemScan::getFunctionSet() { function_set RdfLiteralTripleInMemScan::getFunctionSet() { function_set functionSet; auto func = std::make_unique(name, RdfLiteralTripleInMemScanTableFunc, nullptr, - inMemScanInitSharedState, initLocalState, std::vector{}); + inMemScanInitSharedState, initLocalState, RdfLiteralInMemScanProgressFunc, + std::vector{}); functionSet.push_back(std::move(func)); return functionSet; }