Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] harde rabit framework #4250

Closed
chenqin opened this issue Mar 12, 2019 · 34 comments
Closed

[RFC] harde rabit framework #4250

chenqin opened this issue Mar 12, 2019 · 34 comments

Comments

@chenqin
Copy link
Contributor

chenqin commented Mar 12, 2019

Over the course of last half, I have been working on some of largest xgboost jobs.
Often we face some of weird issues while running thousands of workers over billion rows of training data-set. Minor set of PRs submitted dmlc/dmlc-core#512 dmlc/rabit#81 dmlc/rabit#73

As distributed training moves into production with hundreds of production models running on top of XGB, this lead to the need of harden rabit framework with better code layout/documentation as well as improved function test coverage. Here is list of work I am planning to execute and request for feedbacks from community.

  1. increase test coverages with split of _robust implementations, add test coverage and harden on ring based restore (haven't able to find where that code path were used). There should be a waterfall flow where filesystem based checkpoint and job restart will be executed only when ring based recovery can't restore.

  2. switch distributed xgb checkpoint restore from external checkpoint(HDFS/S3) to peer restore. Redistribute training dataset can take 30 - 40 mins pulling from HDFS, restore single worker failure (increase with cluster getting larger) without restart entire training stage can save significant amount of time. This is particularly interesting while workers running on top of preemptive instances. If small percentage of tasks might get rescheduled due to executor host get preempted, whether job can restore fast. AFAIK, not of other MPI frameworks has build such recovery mechanism to run on top of preemptive instances.

  3. more MPI features, primitive metrics and profiling capability. Investigate job failure can be burdensome, primitive metrics report aggregation could improve user experience a lot.

Thanks,
Chen

@hcho3 hcho3 changed the title RFC - harden/improve rabit framework [RFC] harden/improve rabit framework Mar 12, 2019
@hcho3
Copy link
Collaborator

hcho3 commented Mar 12, 2019

@chenqin Hi, first of all, I'd like to say thank you. Your work in distributed training has been instrumental in brining XGBoost to industrial-grade standard.

I'd like to receives your suggestions about improving the CI infrastructure, i.e. how we should test distributed training. See discussion at #4234.

For example, we should maybe test dmlc/rabit together in dmlc/xgboost, so that no future change of dmlc/rabit would break distributed XGBoost.

@thvasilo
Copy link
Contributor

Thanks for all the work @chenqin ! Regarding rabbit's future, I was also wondering if it would be possible to introduce a scatter-reduce algorithm like LightGBM uses.

The benefits when user 2^x workers seem to be significant.

For job restores, what is the state that we need to be restoring? I'm not familiar with the term peer restore, does that mean communicating data and models from other nodes instead of a single node reading from HDFS?
My assumption is that if a worker fails and we launch a new one, the new one would need to first read its data partition into memory from a distributed FS, then read the current model version as well.
Am I missing something here?

Do you think there are ideas we can use from Flink/Spark job recovery?

@chenqin
Copy link
Contributor Author

chenqin commented Mar 12, 2019

I'd like to receives your suggestions about improving the CI infrastructure, i.e. how we should test distributed training. See discussion at #4234.

Sounds good, split build/test , adding OSX, LINUX, WINDOWS should be straight forward.

For example, we should maybe test dmlc/rabit together in dmlc/xgboost, so that no future change of dmlc/rabit would break distributed XGBoost.

In term of distributed XGBoost, we can start with including XGBoost4j-Spark tests as start point. Possibly next adding simulate rabit failures tests in XGBoost4j-Spark using mock engine to verify item 2)

@chenqin
Copy link
Contributor Author

chenqin commented Mar 12, 2019

Thanks for all the work @chenqin ! Regarding rabbit's future, I was also wondering if it would be possible to introduce a scatter-reduce algorithm like LightGBM uses.

yes, that should be item 3) once we can verify framework failure recovery works as expected. We can add more functions with good failure resilient recovery support.

The benefits when user 2^x workers seem to be significant.

For job restores, what is the state that we need to be restoring? I'm not familiar with the term peer restore, does that mean communicating data and models from other nodes instead of a single node reading from HDFS?
My assumption is that if a worker fails and we launch a new one, the new one would need to first read its data partition into memory from a distributed FS, then read the current model version as well.
Am I missing something here?

That's mostly correct to start a job. item 2) is mostly around the case when you have single worker failure. How distributed XGBoost deal with entire worker fleet. What we have is to drop all works since last checkpoint and restart job.(entire load data -> restore external checkpoint -> continue train) What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce) The catch is there will be situation where XGBoost lost too many workers in tree-reduce (most likely) topology, so we need to basically fallback to what we have right now.

Do you think there are ideas we can use from Flink/Spark job recovery?

AFAIK underlaying Flink/Spark is continuous processing and map-reduce paradigms. In map-reduce, we can retry mapper in case they lost, sync (barrier) happens after a map stage complete. In continuous processing, the sync might never happens hence flink introduced control messages to coordinate workers with different roles.

In distributed XGBoost, global sync(ALLREDUCE/BRAODCAST) happens frequently to update histogram on fixed topology (network topology is fixed after all workers registered in tracker). AllReduce -> BroadCast->AllReduce ... Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.

@thvasilo
Copy link
Contributor

What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce)

OK if I understand correctly then when a worker fails in this scenario, the others continue with the iteration while we launch a new one and it reads its data partition. We complete the current iteration using the limited information from the live workers, and in the next iteration, the new worker gets the model state and we continue as before.

Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.

OK I see what you are saying. We have discussed early termination of histogram creation/communication with @hcho3 in the past, where concentration bounds like the Hoeffding bound can be used to prove convergence, similar to the VFDT tree or this recent work by @arapat and Freund (code here).

@thvasilo
Copy link
Contributor

Linking Rabit's paper here that focuses on the fault tolerance mechanism.

I'll try to go through this and find correspondences in the code, and hopefully annotate the code with comments to make it more readable.

@chenqin
Copy link
Contributor Author

chenqin commented Mar 15, 2019

What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce)

OK if I understand correctly then when a worker fails in this scenario, the others continue with the iteration while we launch a new one and it reads its data partition. We complete the current iteration using the limited information from the live workers, and in the next iteration, the new worker gets the model state and we continue as before.

I agree we should trace back to rabit paper and see how they designed failure recovery initially. At same time, binary tree wise tree reduce might not be most performant network topology considering time cost of {allreduce-> broadcast} e.g when we try to grow tree depth on large worker cluster total number of {allreduce-> broadcast} per iteration goes up exponentially (2^number_of_depth) so as number of network hops (log (number_of_workers) ).

Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.

OK I see what you are saying. We have discussed early termination of histogram creation/communication with @hcho3 in the past, where concentration bounds like the Hoeffding bound can be used to prove convergence, similar to the VFDT tree or this recent work by @arapat and Freund (code here).

Thanks! Let me dig in a bit and will come back to you latter.

@chenqin
Copy link
Contributor Author

chenqin commented Apr 3, 2019

Rabit2 design proposal

keyword FSM, EPOLL, C10K, data linerage, map-reduce, allreduce, fast recovery, large scale, MPI

dmlc/rabit#89

@hcho3 hcho3 mentioned this issue Apr 21, 2019
18 tasks
@hcho3
Copy link
Collaborator

hcho3 commented Apr 29, 2019

@chenqin Is this task complete?

@chenqin
Copy link
Contributor Author

chenqin commented Apr 29, 2019

I think we can say hardening part is complete, improving is still undergoing.

@chenqin
Copy link
Contributor Author

chenqin commented Jun 11, 2019

Improvement

I have been working on enable native rabit checkpoint in YARN/SPARK deployment.
here is test I run to instrument failure and test recovery

https://github.com/chenqin/xgboost/blob/master/tests/cli/runsingle.sh
https://github.com/chenqin/xgboost/blob/master/tests/cli/runtests.sh

so far I identified two issues with current rabit based recovery in xgb.

  1. saved checkpoints missed some configurations (e.g max_depth, dsplit) Although, it can be solved relative straightforward by adding those into payload.

  2. DMatrix::Load calls allreduce before loadcheckpoint which failed recovery payload size check. It turns out to be a bit tricky to fix.

In order to construct a Partitioned DMatrix and build learner (which will get injected with loadcheckpoint) we need to sync number of columns in all workers before loadcheckpoint.
https://github.com/chenqin/xgboost/blob/master/src/cli_main.cc#L160

AFAIK, a simple way is to keep this configuration in tracker (each worker just call once when it runs or reruns) and expose via rabit.h with following changes https://github.com/chenqin/rabit/commit/11adddc83fa8ec0041b547fb2fe77721cc072eae
https://github.com/chenqin/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py#L271-L279
here is code change in XGB
https://github.com/chenqin/xgboost/blob/master/src/data/data.cc#L227-L248

then it pass failure recovery tests with rabit.

cc @hcho3 @trivialfis @CodingCat

@trivialfis
Copy link
Member

@chenqin Thanks. Honestly I don't quite understand what's happening ... So I will stay out of this.

@wstian
Copy link

wstian commented Jun 13, 2019

AFAIK, a simple way is to keep this configuration in tracker (each worker just call once when it runs or reruns) and expose via rabit.h with following changes chenqin/rabit@11adddc
https://github.com/chenqin/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py#L271-L279
here is code change in XGB
https://github.com/chenqin/xgboost/blob/master/src/data/data.cc#L227-L248

then it pass failure recovery tests with rabit.

cc @hcho3 @trivialfis @CodingCat

follow your github repo @chenqin , dmlc/rabit#63 is solved. but there is another issue when one worker stopped and restart. I add some logging, the error occured when gmat_.Init()

[09:56:15] INFO: /opt/xgboost/src/learner.cc:215: Tree method is selected to be 'hist', which uses a single updater grow_quantile_histmaker.
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:267: gbm:BoostNewTrees() start
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:255: GBM::InitUpdater(), pstr: 'grow_quantile_histmaker'
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:290: gbm:BoostNewTrees() call up->Update()
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:55: QuantileHistMaker::Update() start
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:58: QuantileHistMaker::Update() call gmat_.Init()
[09:56:16] INFO: /opt/xgboost/src/common/hist_util.cc:177: HIstCutMatrix::Init() call sreducer.Allreduce() start
[debug] allreduce_robust::TryGetResult(), data_size: 6678336, size: 342115888
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber

@chenqin
Copy link
Contributor Author

chenqin commented Jun 13, 2019

can you share tests case you were running?

follow your github repo @chenqin , dmlc/rabit#63 is solved. but there is another issue when one worker stopped and restart. I add some logging, the error occured when gmat_.Init()

[09:56:15] INFO: /opt/xgboost/src/learner.cc:215: Tree method is selected to be 'hist', which uses a single updater grow_quantile_histmaker.
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:267: gbm:BoostNewTrees() start
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:255: GBM::InitUpdater(), pstr: 'grow_quantile_histmaker'
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:290: gbm:BoostNewTrees() call up->Update()
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:55: QuantileHistMaker::Update() start
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:58: QuantileHistMaker::Update() call gmat_.Init()
[09:56:16] INFO: /opt/xgboost/src/common/hist_util.cc:177: HIstCutMatrix::Init() call sreducer.Allreduce() start
[debug] allreduce_robust::TryGetResult(), data_size: 6678336, size: 342115888
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber

@wstian
Copy link

wstian commented Jun 14, 2019

my test as follow:

after training started for a few round, one worker killed. then restart the worker.

the conf:

{
  "silent": false,
  "n_estimators": 500,
  "max_depth": 5,
  "nthread": 10,
  "learning_rate": 0.07,
  "early_stopping_rounds": 50,
  "subsample": 0.8,
  "tree_method": "hist",
  "verbosity": 3,
  "eval_metric": "auc"
}

the above error was tree_method=hist.

but, with tree_method = approx, it worked without any error.

so it seems there is still some problem with tree_method=hist

@chenqin
Copy link
Contributor Author

chenqin commented Jun 14, 2019

Thanks for sharing, will take a look!

update:
there is much simpler way of backfill allreduce/broadcast resbuf from other workers which should address this issue.

update 4th July:
both hist and approx works with latest rabit recovery cache (orthogonal to "in same iteration checkpointing" based recovery)

@trams
Copy link
Contributor

trams commented Jun 25, 2019

Thank you, @chenqin for all the work. We are experimenting with XGBoost on a large scale and I am very interested in working on making xgboost-spark training reliable.

I read this thread and I am a bit confused. As far as I understand xgboost-spark is not reliable. At least it has TaskFailedListener which kills the job if one of tasks fail. On another hand you have a test showing that distributed xgboost without Spark can recover from a node failure (great achievement!)

Do I understand correctly that distributed training in xgboost-spark is still not reliable (in 0.9)?
What should we do to make it reliable? It seems that the underneath tech support fault tolerance.
I would gladly hear your opinion.

@chenqin
Copy link
Contributor Author

chenqin commented Jun 25, 2019

@trams xgb-spark is reliable and able to do checkpoint restore, we daily runs large number of jobs with hundreds of containers and thousands of cores deployment. This doc is tracking works around rabit (worker synchronization layer). As we speak right now, XGBSpark works fine with remote checkpoint and restart / resume job.

The goal of improvement is to allow distributed xgb tolerate a few failures without restart/resume entire training job. It can also help moving larger xgb workers on cheaper preemptive instances which can only last a few iterations with much cheaper cost.

Thank you, @chenqin for all the work. We are experimenting with XGBoost on a large scale and I am very interested in working on making xgboost-spark training reliable.

I read this thread and I am a bit confused. As far as I understand xgboost-spark is not reliable. At least it has TaskFailedListener which kills the job if one of tasks fail. On another hand you have a test showing that distributed xgboost without Spark can recover from a node failure (great achievement!)

Do I understand correctly that distributed training in xgboost-spark is still not reliable (in 0.9)?
What should we do to make it reliable? It seems that the underneath tech support fault tolerance.
I would gladly hear your opinion.

@CodingCat
Copy link
Member

@trams fail-all-and-make-a-checkpoint on any error is a very typical strategy for machine learning systems.

The very original reason to fail everything on any error in spark layer is that the rabit fault tolerance is a bit off here and there leading to the stuck training process. Fixing those "offs" is the topic in this thread. In future, we should provide two types of fault tolerance strategy, auto task recovery functionality from this thread, and checkpoint in spark (to handle situation that the whole job is down for some reason)

@trams
Copy link
Contributor

trams commented Jun 26, 2019

I am glad to hear about your plans on making distributed xgboost more reliable. And thank you for a quick response

As for checkpointing it works but #3946 prevents us to use to full extend

@chenqin
Copy link
Contributor Author

chenqin commented Jun 29, 2019

@wstian can you try my latest master and see if it can pass your previous tests next week?
I can see it pass https://github.com/chenqin/xgboost/blob/master/tests/cli/runhist.sh

@chenqin
Copy link
Contributor Author

chenqin commented Jul 2, 2019

rabit one off recovery cache
dmlc/rabit#98

@wstian
Copy link

wstian commented Jul 6, 2019

@wstian can you try my latest master and see if it can pass your previous tests next week?
I can see it pass https://github.com/chenqin/xgboost/blob/master/tests/cli/runhist.sh

sorry for late response.

try with your latest master:

and repeat the previous tests, which is

  • start a distributed training, with worker-0 and worker-1

  • after data loaded and training started, shutdown worker-0

  • after tracker receive recover signal from worker-1, restart worker-0

after the shutdown worker-0 restarted, worker-0 hit an error and exit.

[16:22:39] XGBoost distributed mode detected, will split data among workers
[16:22:39] Load part of data 0 of 2 parts
[16:22:44] 100017x10438 matrix with 330975794 entries loaded from train.200k.libsvm
Too many nodes went down and we cannot recover.., shutting down process

worker-1 also come across this error and exit.

Do I miss any dependency update, or using wrong commits?

@chenqin
Copy link
Contributor Author

chenqin commented Jul 8, 2019

@wstian I added more logging https://github.com/chenqin/rabit/commit/2bd993a8d0483ddb8a4661561cf2bfd16bcd8a91
please paste log from your side

The error you saw might be a rabit long existing rabit bug. in my test, it means checkpoint/loadcheckpoint collective calls sometimes unexpectedly interact with allreduce/broadcast calls from healthy nodes. Which causes trygetresult fail due to lack of necessary roles to operate (KRequestData, KHaveData)

@wstian
Copy link

wstian commented Jul 9, 2019

@wstian I added more logging chenqin/rabit@2bd993a
please paste log from your side

The error you saw might be a rabit long existing rabit bug. in my test, it means checkpoint/loadcheckpoint collective calls sometimes unexpectedly interact with allreduce/broadcast calls from healthy nodes. Which causes trygetresult fail due to lack of necessary roles to operate (KRequestData, KHaveData)

try with [chenqin/rabit@2bd993a]

worker-0 shut down when restart, log as below:

2019-07-09 09:54:45,298 - start-worker.py[line:92] - INFO: load train: train.200k.libsvm
[09:54:45] XGBoost distributed mode detected, will split data among workers
[09:54:45] Load part of data 0 of 2 parts
[09:54:50] 100017x10438 matrix with 330975794 entries loaded from train.200k.libsvm
[0] RecoverExec caller GetCache seq 0
[0] caller GetCache requester 1
[0] checkpoint req - |0|0|0|1|0| - |0|1|0|
[0] checkpoint ack - |0|1|0|1|1| - |33554432|0|1|
[0] role is 1 in trygetresult seqno 0 seq_counter 0
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber, shutting down process

worker-1 is still running (not shut down this time), log as below:

[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] checkpoint req - |33554432|1|0|0|0| - |33554432|0|0|
[1] checkpoint ack - |0|1|0|1|1| - |33554432|0|1|
[1] role is 0 in trygetresult seqno 0 seq_counter 9

@chenqin
Copy link
Contributor Author

chenqin commented Jul 9, 2019

@wstian quick look into log you posted, I think it was same as I saw before.

wha happens is worker-1 calls into checkpoint procedure while worker-0 is still in bootstrap/allreduce.
https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L362

it force worker1 and worker0 into collective checkpoint state
https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L1018

I guess the initial intention of this code is to see if any nodes are missing latest checkpoint payload before collective checkpointing. The problem you saw was that worker 0 caller give a buffer pointer and size that suppose to do allreduce/broadcast. When actual checkpoint payload recover starts. payload size mismatch.

working on fixing.

update:
here is a fix candidate,

`

[0]@@@hit Mock Error:CheckPoint
2019-07-09 21:23:53,771 INFO [21:23:53] [3] test-rmse:0.007253
[21:23:54] start instance-xgboost:0
[21:23:54] Load part of data 0 of 10 parts
[21:23:54] 650x127 matrix with 14300 entries loaded from ../../demo/data/agaricus.txt.train
[21:23:54] Load part of data 0 of 10 parts
[21:23:54] 161x127 matrix with 3542 entries loaded from ../../demo/data/agaricus.txt.test
[21:23:54] Tree method is specified to be 'hist' for distributed training.
2019-07-09 21:23:54,947 INFO [21:23:54] [4] test-rmse:0.006189
2019-07-09 21:23:55,044 INFO [21:23:55] [5] test-rmse:0.005330
2019-07-09 21:23:55,143 INFO [21:23:55] [6] test-rmse:0.004574
2019-07-09 21:23:55,239 INFO [21:23:55] [7] test-rmse:0.003929
2019-07-09 21:23:55,335 INFO [21:23:55] [8] test-rmse:0.003416
2019-07-09 21:23:55,431 INFO [21:23:55] [9] test-rmse:0.002966
2019-07-09 21:23:55,527 INFO [21:23:55] [10] test-rmse:0.002581
2019-07-09 21:23:55,531 INFO @TracKer All nodes finishes job
2019-07-09 21:23:55,531 INFO @TracKer 2.28832602501 secs between node start and job finish

`
https://github.com/chenqin/rabit/commit/1e82c3369edd83c41289c0e497caefd49f831cb6

@wstian
Copy link

wstian commented Jul 10, 2019

try chenqin/rabit@1e82c33

worker-0 shutdown with following log:

[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] load checkpoint global 29235 version 23
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] RecoverExec caller CheckPoint_ seq 2
[0] RecoverExec caller CheckPoint_ seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |1|0|0|0|1| - |33699152|0|1|
[0] pre trygetresult seqno 1 requester 1
[0] role is 1 in trygetresult seqno 1 seq_counter 1
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber, shutting down process

worker-1 log:

[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] RecoverExec caller CheckPoint_ seq 9
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 1
[1] RecoverExec caller CheckPoint_ seq 2
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0

@chenqin
Copy link
Contributor Author

chenqin commented Jul 11, 2019

try chenqin/rabit@1e82c33

worker-0 shutdown with following log:

[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] load checkpoint global 17351 version 13
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] RecoverExec caller CheckPoint_ seq 2
[0] RecoverExec caller CheckPoint_ seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1|
[0] pre trygetresult seqno -55050240 requester 0
AssertError:likely minimal seqno overflow, shutting down process

Thanks you quick response! This is also a known issue where I put an assert to avoid unexpected state
https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L932
Could you also post log from worker-1 as well, -55050240 is really weird seqno

Also, I tried to avoid seq number overflow by reducing specialOp value. Can you help give another try on this as well?
https://github.com/chenqin/rabit/commit/aa2153fcc36f943e1429537d2b0c1caf4a38d80c

@wstian
Copy link

wstian commented Jul 12, 2019

try chenqin/rabit@aa2153f

worker-0 shutdown, log as below:

[0] load checkpoint global 17351 version 13
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1|
AssertError:likely minimal seqno overflow, shutting down process

worker-1 alive, waiting, but no other log.

@chenqin
Copy link
Contributor Author

chenqin commented Jul 12, 2019

humm, last try. adding assertion of negative seq no as "recv ack - |-55050240|" as well as flag logging.

passing multiple run of https://github.com/chenqin/xgboost/blob/master/tests/cli/runxgbtests.sh

@wstian will you be able to help run one last time against https://github.com/chenqin/rabit/commit/0a522a77829162f2dbea8c99487819b8b50b47c2

@chenqin
Copy link
Contributor Author

chenqin commented Jul 12, 2019

Let's document what is "expected" fault recovery behavior in current rabit design so we are all on same page with reasonable expectation.

  1. allreduce/broadcast has to happen between loadcheckpoint and checkpoint calls.

  2. in order to let failed node catchup, no "bootstrap" (before loadcheckpoint) allreduce/broadcast is allowed. Otherwise, other nodes will try to recover allreduce broadcast ops before loadcheckpoint with allreduce broadcasts after last checkpoints.

  3. in case too many nodes are down, it will recover from very beginning.

example of supported use case:

  • loadcheckpoint-0, allreduce-0 checkpoint-0 -> allreduce1,... ,broadcast1, checkpoint 1
    Notice we always have latest checkpoint from last finished iteration available. if any node fail, recovery involves loadcheckpoint and recover allreduce/broadcast results from healthy nodes.

example of not supported use case:

  • allreduce loadcheckpoint-0 (if not init then allreduce-x) ... checkpoint 0

if node fail and recovered node run allreduce before loadcheckpoint, other nodes will not be able to recover results. A successful checkpoint also instruments all nodes clear allreduce/boradcast results from corresponding iteration. Hence recovery node will not be able to pick right results from other nodes. It also apply to histogram init within first iteration after loadcheckpoint-0. When nodes surpass first iteration, those results will be lost.

@chenqin
Copy link
Contributor Author

chenqin commented Jul 23, 2019

try chenqin/rabit@aa2153f

worker-0 shutdown, log as below:

[0] load checkpoint global 17351 version 13
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1|
AssertError:likely minimal seqno overflow, shutting down process

worker-1 alive, waiting, but no other log.

So I found root cause of negative overflow, it was caused by introduce additional bit and update left shift 5 bits. It cause kSpecialOp << 5 + flags greater than signed int32 range. I submitted a fix which will use unsigned_int32 instead of signed one. https://github.com/chenqin/rabit/pull/4/files#diff-5404c32553915533fabb6d48e626e98fR274
The issue should be fix.

@chenqin chenqin changed the title [RFC] harden/improve rabit framework [RFC] harde rabit framework Jul 26, 2019
@trivialfis
Copy link
Member

@chenqin Can we close this one?

@chenqin
Copy link
Contributor Author

chenqin commented Sep 17, 2019

Yes please

@chenqin chenqin closed this as completed Sep 17, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Dec 16, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

7 participants