-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.html
768 lines (706 loc) · 24.2 KB
/
scheduler.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<title>scheduler</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<style type="text/css">
/* GitHub stylesheet for MarkdownPad (http://markdownpad.com) */
/* Author: Nicolas Hery - http://nicolashery.com */
/* Version: b13fe65ca28d2e568c6ed5d7f06581183df8f2ff */
/* Source: https://github.com/nicolahery/markdownpad-github */
/* RESET
=============================================================================*/
html, body, div, span, applet, object, iframe, h1, h2, h3, h4, h5, h6, p, blockquote, pre, a, abbr, acronym, address, big, cite, code, del, dfn, em, img, ins, kbd, q, s, samp, small, strike, strong, sub, sup, tt, var, b, u, i, center, dl, dt, dd, ol, ul, li, fieldset, form, label, legend, table, caption, tbody, tfoot, thead, tr, th, td, article, aside, canvas, details, embed, figure, figcaption, footer, header, hgroup, menu, nav, output, ruby, section, summary, time, mark, audio, video {
margin: 0;
padding: 0;
border: 0;
}
/* BODY
=============================================================================*/
body {
font-family: Helvetica, arial, freesans, clean, sans-serif;
font-size: 14px;
line-height: 1.6;
color: #333;
background-color: #fff;
padding: 20px;
max-width: 960px;
margin: 0 auto;
}
body>*:first-child {
margin-top: 0 !important;
}
body>*:last-child {
margin-bottom: 0 !important;
}
/* BLOCKS
=============================================================================*/
p, blockquote, ul, ol, dl, table, pre {
margin: 15px 0;
}
/* HEADERS
=============================================================================*/
h1, h2, h3, h4, h5, h6 {
margin: 20px 0 10px;
padding: 0;
font-weight: bold;
-webkit-font-smoothing: antialiased;
}
h1 tt, h1 code, h2 tt, h2 code, h3 tt, h3 code, h4 tt, h4 code, h5 tt, h5 code, h6 tt, h6 code {
font-size: inherit;
}
h1 {
font-size: 28px;
color: #000;
}
h2 {
font-size: 24px;
border-bottom: 1px solid #ccc;
color: #000;
}
h3 {
font-size: 18px;
}
h4 {
font-size: 16px;
}
h5 {
font-size: 14px;
}
h6 {
color: #777;
font-size: 14px;
}
body>h2:first-child, body>h1:first-child, body>h1:first-child+h2, body>h3:first-child, body>h4:first-child, body>h5:first-child, body>h6:first-child {
margin-top: 0;
padding-top: 0;
}
a:first-child h1, a:first-child h2, a:first-child h3, a:first-child h4, a:first-child h5, a:first-child h6 {
margin-top: 0;
padding-top: 0;
}
h1+p, h2+p, h3+p, h4+p, h5+p, h6+p {
margin-top: 10px;
}
/* LINKS
=============================================================================*/
a {
color: #4183C4;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
/* LISTS
=============================================================================*/
ul, ol {
padding-left: 30px;
}
ul li > :first-child,
ol li > :first-child,
ul li ul:first-of-type,
ol li ol:first-of-type,
ul li ol:first-of-type,
ol li ul:first-of-type {
margin-top: 0px;
}
ul ul, ul ol, ol ol, ol ul {
margin-bottom: 0;
}
dl {
padding: 0;
}
dl dt {
font-size: 14px;
font-weight: bold;
font-style: italic;
padding: 0;
margin: 15px 0 5px;
}
dl dt:first-child {
padding: 0;
}
dl dt>:first-child {
margin-top: 0px;
}
dl dt>:last-child {
margin-bottom: 0px;
}
dl dd {
margin: 0 0 15px;
padding: 0 15px;
}
dl dd>:first-child {
margin-top: 0px;
}
dl dd>:last-child {
margin-bottom: 0px;
}
/* CODE
=============================================================================*/
pre, code, tt {
font-size: 12px;
font-family: Consolas, "Liberation Mono", Courier, monospace;
}
code, tt {
margin: 0 0px;
padding: 0px 0px;
white-space: nowrap;
border: 1px solid #eaeaea;
background-color: #f8f8f8;
border-radius: 3px;
}
pre>code {
margin: 0;
padding: 0;
white-space: pre;
border: none;
background: transparent;
}
pre {
background-color: #f8f8f8;
border: 1px solid #ccc;
font-size: 13px;
line-height: 19px;
overflow: auto;
padding: 6px 10px;
border-radius: 3px;
}
pre code, pre tt {
background-color: transparent;
border: none;
}
kbd {
-moz-border-bottom-colors: none;
-moz-border-left-colors: none;
-moz-border-right-colors: none;
-moz-border-top-colors: none;
background-color: #DDDDDD;
background-image: linear-gradient(#F1F1F1, #DDDDDD);
background-repeat: repeat-x;
border-color: #DDDDDD #CCCCCC #CCCCCC #DDDDDD;
border-image: none;
border-radius: 2px 2px 2px 2px;
border-style: solid;
border-width: 1px;
font-family: "Helvetica Neue",Helvetica,Arial,sans-serif;
line-height: 10px;
padding: 1px 4px;
}
/* QUOTES
=============================================================================*/
blockquote {
border-left: 4px solid #DDD;
padding: 0 15px;
color: #777;
}
blockquote>:first-child {
margin-top: 0px;
}
blockquote>:last-child {
margin-bottom: 0px;
}
/* HORIZONTAL RULES
=============================================================================*/
hr {
clear: both;
margin: 15px 0;
height: 0px;
overflow: hidden;
border: none;
background: transparent;
border-bottom: 4px solid #ddd;
padding: 0;
}
/* TABLES
=============================================================================*/
table th {
font-weight: bold;
}
table th, table td {
border: 1px solid #ccc;
padding: 6px 13px;
}
table tr {
border-top: 1px solid #ccc;
background-color: #fff;
}
table tr:nth-child(2n) {
background-color: #f8f8f8;
}
/* IMAGES
=============================================================================*/
img {
max-width: 100%
}
</style>
</head>
<body>
<h1>Scheduler</h1>
<h2>DAGScheduler</h2>
<p>DAGScheduler实现了面向stage的调度层. 它将每个job的stage划分成DAG, 追踪RDD和stage输出, 找出一个可以运行作业的最小化调度方法. 接下来就将每个stage以TaskSet提交给TaskScheduler执行.</p>
<p>为了构造出stage的DAG图, 该类还需要根据当前的cache状态, 计算出每个task优先跑在那个节点上, 并将该location传递给低层的TaskScheduler. 此外, 它需要处理shuffule输出文件丢失的异常, 此时前一个stage可能会被重新提交. 不是由shuffle文件丢失引起的错误将会由TaskScheduler处理, 这些task在取消整个stage之前会被重试多次.</p>
<p><strong><em>THREADING: 该类的所有逻辑操作都是在执行run()方法的单线程完成的, 事件都被提交到一个同步队列(eventQueue)中. public的API方法, 如runJob, taskEnded和executorLost, 都会异步的把事件放入到这个队列中. 所有其他的方法都应该是private的.</em></strong></p>
<p>先看start方法. start方法启动了eventProcessorActor, 该actor有两个职责:
- 等待事件, 如作业提交, 作业结束, 作业失败等. 调用[[org.apache.spark.scheduler.DAGScheduler.processEvent()]]来处理事件.
- 调度周期task来重新提交失败的stage.</p>
<p><strong><em>注意: 该actor不能在构造函数里启动, 因为周期任务参考了一些内部状态的封闭[[org.apache.spark.scheduler.DAGScheduler]]对象, 因此在[[org.apache.spark.scheduler.DAGScheduler]完全构造好之前不能调度.</em></strong></p>
<pre><code>def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: DAGSchedulerEvent =>
logTrace("Get event of type " + event.getClass.getName)
/**
* All events are fowarded to `processEvent()`, so that the event processing logic can
* easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
* for details.
* /
if (!processEvent(event)) {
submitWaitingStages()
} else {
context.stop(self)
}
}
}))
}
</code></pre>
<p>可见processEvent是所有事件的入口, 需要注意一下这个函数的返回值, 如果是true表示事件循环可以结束了. processEvent主要针对不同的事件做不同的处理:</p>
<ul>
<li>
<p>JobSubmitted</p>
<ul>
<li>
<p>newStage 创建一个Stage, 要么直接用于result stage, 或者用于newOrUsedStage中的shuffle map stage的创建的一部分. 注意, 如果要生成shuffle map stage, 那么一定要用newOrUsedStage方法, 而不是直接使用newStage. 该方法的实现其实就是new一个Stage和一个StageInfo, 并且更新一堆map.</p>
</li>
<li>
<p>newActiveJob</p>
</li>
<li>
<p>如果允许本地启动, 那么向listenerBus post一个SparkListenerJobStart消息, 再调用runLocally方法</p>
<ul>
<li>listenerBus是什么? 其实是一个异步地向SparkListeners传递SparkListenerEvents事件的工具. 后面单独介绍.</li>
<li>runLocally方法主要是用于在本地的一个RDD上运行一个作业, 假设它只有一个partition并且没有依赖. 我们会另起一个线程来完成这个操作, 这样就不会阻塞住DAGScheduler的事件循环.</li>
<li>runLocally的实际工作线程主要就是调用job的func方法,调用成功后向job的listener发送taskSucceeded消息, 执行taskContext的executeOnCompleteCallbacks. 如果在过程中出现了异常, 就会向job的listener发送jobFailed消息, 最终不论成功与否, 都会向listenerBus发送SparkListenerJobEnd消息.</li>
</ul>
</li>
<li>
<p>如果非本地, 那么还是向listenerBus post一个SparkListenerJobStart消息, 再调用submitStage方法</p>
<ul>
<li>submitStage方法首先调用activeJobForStage方法, 如果返回的jobID已定义且stage不在waiting, running, failed的集合里, 则首先获取所有的missingParentStage, 有missing的话先递归提交missing的, 没有的话调用submitMissingTasks</li>
<li>submitMissingTasks(父stage已经全都执行完了). 首先取出所有的pendingTask, 接下来根据stage类型进行操作, 如果stage是ShuffleMap, 则new出一系列ShuffleMapTask; 否则这就是final stage, 先从resultStageToJob找到所有的job, 接下来new出一系列ResultTask. 这时候会先给listenerBus发送一个SparkListenerStageSubmitted消息, 然后会调用SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head), 先把task序列化好, 最后就调用taskSched.submitTasks(new TaskSet(...))</li>
</ul>
</li>
</ul>
</li>
<li>
<p>JobCancelled</p>
<ul>
<li>
handleJobCancellation(jobId)方法.
<ul>
<li>removeJobAndIndependentStage. 删除所有和其他作业或者stage无关的job. 返回所有要删除的stage id. 所有跟这些stage相关的task都要被删除. 该方法主要就是找到独立Stage的task.</li>
<li>对前面获得的所有独立的task, 调用taskSched.cancelTasks</li>
<li>接下来通知job.listener该job已被cancel, 并向listenerBus发送SparkListenerJobEnd消息</li>
</ul>
</li>
</ul>
</li>
<li>
<p>JobGroupCancelled</p>
<ul>
<li>找到所有与该group id相关的active job, 依次调用handleJobCancellation</li>
</ul>
</li>
<li>
<p>AllJobCancelled</p>
<ul>
<li>对running map中的所有job调用handleJobCancellation</li>
</ul>
</li>
<li>
<p>ExecutorGained</p>
<ul>
<li>handleExecutorGained(execId, host)方法. 其实就是前面已经lost的executor又找到了, 从failedEpoch中移除</li>
</ul>
</li>
<li>
<p>ExecutorLost(TODO)</p>
<ul>
<li>
handleExecutorLost方法. 主要针对exector丢失的情况. 由于该方法在事件循环中调用, 所以它可以改变scheduler内部的状态. 外部通过调用executorLost方法来push一个lost事件. 这里有一个可选择的maybeEpoch参数, 如果已经捕捉到错误可以直接传入, 避免触发抓取错误的逻辑.
<ul>
<li>首先从blockManagerMaster中移除该executor, 接下来对shuffleToMaoStage中所有的stage调用removeOutputsOnExecutor, 接下来从stage的outputLocs中取出locs注册到mapOutputTracker.</li>
</ul>
</li>
</ul>
</li>
<li>
<p>BeginEvent</p>
<ul>
<li>该事件是由TaskScheduler调用用来汇报task启动信息的. 对task的stage做检测, 如果taskInfo的serializedSize过大或者stageInfo.emittedTaskSizeWarning预警, 会打印warning日志, 接下来就向listenerBus发送SparkListenerTaskStart消息.</li>
</ul>
</li>
<li>
<p>GettingResultEvent</p>
<ul>
<li>向listenerBus发送SparkListenerTaskGettingResult消息.</li>
</ul>
</li>
<li>
<p>CompletionEvent</p>
<ul>
<li>向listenerBus发送SparkListenerTaskEnd消息. 然后调用handleTaskCompletion方法.</li>
<li>handleTaskCompletion方法(TODO):</li>
</ul>
</li>
<li>
<p>TastSetFailed</p>
<ul>
<li>对taskSet里所有的stageId, 调用abortStage方法.</li>
<li>abortStage(TODO)</li>
</ul>
</li>
<li>
<p>ResubmitFailedStages</p>
<ul>
<li>首先判断failed.size是否大于0, 这里做判断是因为可能failed stage已经被job cancellation移除了, 因此即使这里触发了ResubmitFailedStages, failed也有可能为空. 接下来就直接调用resubmitFailedStages</li>
<li>resubmitFailedStages(TODO)</li>
</ul>
</li>
<li>
<p>StopDAGScheduler</p>
<ul>
<li>取消所有的active job. 对所有的active job, 调用job listener的jobFailed接口, 并向listenerBus发送SparkListenerJobEnd消息.</li>
</ul>
</li>
</ul>
<h2>SparkListenrBus</h2>
<p>主要成员变量有</p>
<pre><code>// 一个SparkListener的数组
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListner]
// 一个事件队列
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
</code></pre>
<p>主线程(守护)一直尝试从事件队列里取出事件, 然后对每个SparkListener执行该事件.</p>
<p>主要方法:
post方法就是往事件队列里面push事件</p>
<pre><code>def post(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
queueFullErrorMessageLoged = true
}
}
</code></pre>
<h2>SparkListener</h2>
<p>首先定义了一系列SparkListenerEvents:</p>
<ul>
<li>
<p>SparkListenerStageSubmitted</p>
</li>
<li>
<p>SparkListenerStageCompleted</p>
</li>
<li>
<p>SparkListenerTaskStart</p>
</li>
<li>
<p>SparkListenerTaskGettingResult</p>
</li>
<li>
<p>SparkListenerTaskEnd</p>
</li>
<li>
<p>SparkListenerJobStart</p>
</li>
<li>
<p>SparkListenerJobEnd</p>
</li>
</ul>
<p>特质SparkListener主要定义了一堆onCompletion接口.</p>
<ul>
<li>
<p>onStageCompleted</p>
</li>
<li>
<p>onStageSubmitted</p>
</li>
<li>
<p>onTaskStart</p>
</li>
<li>
<p>onTaskGettingResult</p>
</li>
<li>
<p>onTaskEnd</p>
</li>
<li>
<p>onJobStart</p>
</li>
<li>
<p>onJobEnd</p>
</li>
</ul>
<p>StatsReportListener类主要记录了当每个stage完成时的一些统计信息, 因此只实现了一个onStageCompleted接口.伴生对象StatReportListener实现了一堆metrics获取的方法.</p>
<h2>TaskScheduler && TaskSchedulerImpl</h2>
<p>TaskScheduler提供了更底层的task调度接口, 目前是由ClusterScheduler实现的. 这些接口定义允许用户自定义可插拔的task调度器. 每一个TaskScheduler为一个独立的SparkContext调度任务. 这些调度器会从DAGScheduler的每个stage获取一个task集合, 然后将这些task发送给集群, 运行, 在发生错误时进行重试, 并且<strong><em>减轻落后者</em></strong>. 最终以事件的形式将结果反馈给DAGScheduler.
主要成员变量与接口:</p>
<ul>
<li>
<p>rootPool</p>
</li>
<li>
<p>schedulingMode 调度模式</p>
</li>
<li>
<p>start</p>
</li>
<li>
<p>postStartHook 在系统成功初始化后会被调用(特别是spark context初始化后). Yarn可以用这个接口来bootstrap基于preferred location的资源分配, 等待slave注册等.</p>
</li>
<li>
<p>stop 和集群断开连接</p>
</li>
<li>
<p>submitTasks 提交一系列task来运行</p>
</li>
<li>
<p>cancelTasks 取消某个stage</p>
</li>
<li>
<p>setDAGScheduler</p>
</li>
<li>
<p>defaultParallelism 获取该集群的默认并发数</p>
</li>
</ul>
<p>TaskSchedulerImpl是TaskScheduler接口的一个实现. 通过SchedulerBackend来为多类cluster调度任务. 也可以通过配置使用LocalBackend. 它主要负责普通逻辑, 如决定作业之间的调度顺序, 唤醒speculative任务等. Clients必须先调用initialize和start, 然后通过runTasks方法提交task集合.</p>
<p><strong><em>THREADING: SchedulerBackends和提交任务的客户端可以从不同的线程中调用该类, 所以在public API方法中需要加锁. 此外, 一些SchedulerBackends在发送事件时可以在这里进行同步, 它们在这里会获取锁, 所谓我们必须确保在持有锁时不会尝试锁住backend</em></strong></p>
<p>成员变量和接口:</p>
<ul>
<li>
<p>SPECULATION_INTERVAL 检测speculative任务的频率</p>
</li>
<li>
<p>STARVATION_TIMEOUT 警告用户饥饿时间的上限</p>
</li>
<li>
<p>activeTaskSets: HashMap[String, TaskSetManager] TaskSetManager不是线程安全的, 所以在这里需要同步.</p>
</li>
<li>
<p>taskIdToTaskSetId: HashMap[Long, String]</p>
</li>
<li>
<p>taskIdToExecutorId: HashMap[long, String]</p>
</li>
<li>
<p>nextTaskId: AtomicLong 每次自增的task IDs</p>
</li>
<li>
<p>activeExecutorIds: HashSet[String]</p>
</li>
<li>
<p>executorsByHost: HashMap[String, HashSet[String]] 每个host上拥有的executor集合; 这个变量用于计算hostsAlive, 其返回值用户决定是否在给定host上可以获得数据locality.</p>
</li>
<li>
<p>executorIdToHost: HashMap[String, String]</p>
</li>
<li>
<p>dagScheduler: DAGScheduler</p>
</li>
<li>
<p>backend: SchedulerBackend</p>
</li>
<li>
<p>mapOutputTracker: SparkEnv.get.mapOutputTracker</p>
</li>
<li>
<p>schedulableBuilder: SchedulableBuilder</p>
</li>
<li>
<p>rootPool: Pool</p>
</li>
<li>
<p>schedulingMode: SchedulingMode</p>
</li>
<li>
<p>taskResultGetter: TaskResultGetter</p>
</li>
<li>
<p>initialize: 初始化, 设置backend, new出rootPool, 创建schedulableBuilder并调用schedulableBuilder的buildPools方法</p>
</li>
<li>
<p>start: 启动backend, 如果有speculation需求, 启动speculative executor thread.</p>
</li>
<li>
<p>submitTasks: 加锁.</p>
<ul>
<li>
<p>首先new出一个TaskSetManager用于管理这一组task, 在该记录的数据结构里记下这组映射, 并且在schedulableBuilder里addTaskSetManager</p>
</li>
<li>
<p>接下来等待hasLaunchedTask为true, 如果在饥饿时间内还没有为true(就是没有分配到资源)就报警.</p>
</li>
<li>
<p>调用backend的reviveOffers</p>
</li>
</ul>
</li>
<li>
<p>cancelTasks: 加锁(传入的是stageId)</p>
<ul>
<li>
<p>首先从activeTaskSets中查找taskSetManager, 这里有两种情况, 一种是taskSetManager已经创建并且已经调度了一些任务, 这种情况下需要给executor发送signal信号来kill task并且终止该stage; 另一种是还没有调度任务, 那只要直接abort这个stage就好了</p>
</li>
<li>
<p>如果有runningTasksSet, 调用backend的killTask</p>
</li>
<li>
<p>调用taskSetManager的abort终止stage.</p>
</li>
</ul>
</li>
<li>
<p>taskSetFinished: 加锁. 当taskSetManager管理的所有task attempt完成时调用该方法. 从activeTaskSets里面移除task set, 从该manager的父manager中移除该manager.</p>
</li>
<li>
<p>resourceOffers: 加锁. 由cluster manager调用, 提供slave的资源. 我们按照优先级顺序从active task里取出task, 然后以round-robin的形式把这些task分配到cluster上以保证均衡.</p>
<ul>
<li>
<p>记录汇报上来的资源, 放到各个map里去</p>
</li>
<li>
<p>创建一个待分配的task列表. </p>
</li>
<li>
<p>从排好序的task列表中取出task, 然后按照locality层增序提供给每个计算节点, 这样可以让大家都有机会运行本地task.<strong><em>重要</em></strong></p>
</li>
</ul>
</li>
<li>
<p>statusUpdate: 加锁. 更改某个tid的状态.</p>
<ul>
<li>
<p>如果要改成LOST状态, 首先从activeExecotrIds从取出, 并且加入到failedExecutor中.</p>
</li>
<li>
<p>找到对应的TaskSet, 如果是正常结束, 调用taskResultGetter.enqueueSuccessfulTask, 如果是异常结束, 则调用taskResultGetter.enqueueFailedTask</p>
</li>
<li>
<p>如果有failedExecotr, 需要通知DAGScheduler, 并且调用backend的reviveOffers. <strong><em>注意这里在锁外, 如果在锁内会有死锁</em></strong></p>
</li>
</ul>
</li>
<li>
<p>handleTaskGettingResult</p>
<ul>
<li>裸调taskSetManager的对应方法</li>
</ul>
</li>
<li>
<p>handleSuccessfulTask</p>
<ul>
<li>裸调taskSetManager的对应方法</li>
</ul>
</li>
<li>
<p>handleFailedTask</p>
<ul>
<li>
<p>裸调taskSetManager的对应方法</p>
</li>
<li>
<p>如果taskManager已经进入了Zombie状态, 需要通知backend的reviveOffers.</p>
</li>
</ul>
</li>
</ul>
<p>TaskSchedulerImlp的伴生对象, 仅提供了一个prioritizeContainers方法. 这个方法是用于跨机器均衡containers的. 该方法传入一个host和host拥有资源的map, 返回一个资源分配的顺序列表. 注意传入的host资源已经是有序的, 因此我们在同一台机器上我们优先分配前面的container.</p>
<p>示例: 给定 <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, 返回[o1, o5, o4, o2, o6, o3].</p>
<h2>SchedulerBackend</h2>
<p>调度系统的后台接口, 允许在ClusterScheduler下挂载不同的调度器. 我们假定Mesos-like的系统, 可以从机器上获取资源并在上面运行任务.
接口:
- start</p>
<ul>
<li>
<p>stop</p>
</li>
<li>
<p>reviveOffers</p>
</li>
<li>
<p>defaultParallelism</p>
</li>
<li>
<p>killTask</p>
</li>
</ul>
<h2>Schedulable</h2>
<p>特质, 定义了可调度实体的接口, 实际有两类可调度实体, 分别是Pools和TaskSetManagers.</p>
<h2>Pool</h2>
<ul>
<li>
<p>schedulableQueue: ArrayBuffer[Scheduable] 记录所有的Schedulable</p>
</li>
<li>
<p>schedulableNameToSchedulable: HashMap[String, Schedulable] 记录所有Schedulable和名字的映射</p>
</li>
</ul>
<p>后面提供的一些接口就是对这两个数据集的增删改查.</p>
<h2>TaskSetManagers(TODO)</h2>
<h2>SchedulableBuilder</h2>
<p>特质, 构建Schedulable tree的接口, 总共就两个接口:
- buildPools 用来构建所有树的节点</p>
<ul>
<li>addTaskSetManager 构建树的叶子节点(TaskSetManager)</li>
</ul>
<p>针对该特质提供了两个实现:
- FIFOSchedulableBuilder
- buildPools 空</p>
<pre><code>- addTaskSetManager 直接调用rootPool的addSchedulable接口
</code></pre>
<ul>
<li>
<p>FairSchedulableBuilder</p>
<ul>
<li>
<p>buildPools 从配置文件中getResourceAsStream, 挨个调用buildFairSchedulerPool, 最后调用buildDefaultPool</p>
<ul>
<li>
<p>buildDefaultPool new出一个Pool来加入rootPool</p>
</li>
<li>
<p>buildFairSchedulerPool 根据配置文件new出Pool加入rootPool</p>
</li>
</ul>
</li>
<li>
<p>addTaskSetManager 从rootPool中取出所有符合要求的parentPool, 如果没有则new出一个新的, 接下来把manager加到parentPool中去.</p>
</li>
</ul>
</li>
</ul>
<h2>TaskResultGetter</h2>
<h2>TaskSet</h2>
<h2>Task</h2>
<h2>TaskInfo</h2>
<h2>Stage</h2>
<p>Stage是一个Spark job中运行相同计算逻辑的一组独立task的集合, 这些task的shuffle依赖都相同. 每个task的DAG都被切分成不同的stage, 不同的stage是以shuffle发生为边界的. DAGScheduler会以逻辑顺序运行这些stage.</p>
<p>Stage就分两种, 一种是shuffle map stage, 这类task的结果是另一阶段task的输入; 另一种是result stage, 这类task可以直接完成计算行为如初始化job(e.g. count(), save(), etc). 对于shuffle map stage, 我们还要追踪每个输出partition在哪些节点上.</p>
<p>每个Stage都有一个jobId, 可以和提交该stage的job做区分. 在使用FIFO调度时, 这使得先来job的stage可以先被计算或者快速回复.</p>
<ul>
<li>
<p>isAvailable 判断是否可用. 如果是shuffle map stage则直接返回true, 而如果是result stage则需要所有可用输出等于paritition数目</p>
</li>
<li>
<p>addOutputLoc && removeOutputLoc outputLocs是一个Array.fill[List[MapStatus]], 记录每个partition的MapStatus列表</p>
</li>
<li>
<p>removeOutputsOnExecutor(TODO)</p>
</li>
<li>newAttemptId(TODO)</li>
</ul>
<h2>StageInfo</h2>
<p>保存所有从scheduler发送给SparkListener的stage信息. 其中taskInfos保存了所有已完成的task的metrics信息, 包括冗余的, 特殊的task. 两个元素, stage和taskInfos. 其中taskInfos是mutable.Buffer[(TaskInfo, TaskMetrics)]类型.</p>
</body>
</html>
<!-- This document was created with MarkdownPad, the Markdown editor for Windows (http://markdownpad.com) -->