-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: improve spark parallel #450
Conversation
LOG.info("\n Start to load data using spark bulkload \n"); | ||
// gen-hfile | ||
HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct, | ||
loadDistributeMetrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
很奇怪这里的 loadDistributeMetrics, 这个代码是跑在算子里面的,我理解应该是算子获取的是这个方法的备份吧?spark 怎么把这个传到 drive 里面来呢?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoadDistributeMetrics 里用的是spark的累加器,能聚合executor的值到driver
https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadDistributeMetrics.java#L54
Codecov Report
@@ Coverage Diff @@
## master #450 +/- ##
============================================
- Coverage 62.57% 62.52% -0.05%
+ Complexity 1867 894 -973
============================================
Files 260 91 -169
Lines 9418 4395 -5023
Branches 872 516 -356
============================================
- Hits 5893 2748 -3145
+ Misses 3143 1444 -1699
+ Partials 382 203 -179
... and 169 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
LoadContext context = initPartition(this.loadOptions, struct); | ||
p.forEachRemaining((Row row) -> { | ||
loadRow(struct, row, p, context); | ||
Future<?> future = Executors.newCachedThreadPool().submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
按我个人理解,这里并发应该没有线程安全问题了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里用 cache threadpool,按我个人理解应该是加载多个文件,所以并行执行,生成多个 DAG,然后由 spark 去做调度具体任务,所以我这里没有考虑线程池大小
No description provided.