-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathl14-spark.html
More file actions
430 lines (332 loc) · 16.4 KB
/
l14-spark.html
File metadata and controls
430 lines (332 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
<h1>6.824 2015 Lecture 14: Spark</h1>
<p><strong>Note:</strong> These lecture notes were slightly modified from the ones posted on the
6.824 <a href="http://nil.csail.mit.edu/6.824/2015/schedule.html">course website</a> from
Spring 2015.</p>
<h2>Introduction</h2>
<ul>
<li>MapReduce benefits:
<ul>
<li>Scales</li>
<li>Fault tolerance</li>
<li>Strategy dealing with stragglers
<ul>
<li>If a map task is low, MapReduce starts another task on a different
machine</li>
<li>Tasks don't communicate, so easy to have them run twice in parallel</li>
</ul></li>
</ul></li>
<li>MapReduce limitations:
<ul>
<li>very rigid form of computation</li>
<li>one map phase, one level of communication between maps and reduce, and the
reduce phase</li>
<li>what if you wanted to build an inverted index <strong>and</strong> then sort it by the
most popular keyword <code>=></code> you would need two MapReduce jobs</li>
<li>so, cannot properly deal with multi-stage, iterative nor interactive jobs</li>
</ul></li>
</ul>
<p>Users needed more complicated computations <code>=></code> Spark</p>
<p>There were previous solutions that tackled different types of computations
individually. Spark's aim was to provide one solution for a general enough model
of computation.</p>
<h2>Spark</h2>
<p>Hard to do DSM while maintaining scalability and fault tolerance properties.</p>
<p><strong>RDDs: Resilient Distributed Datasets</strong></p>
<ul>
<li>a Scala object essentially</li>
<li>immutable</li>
<li>partitioned across machines</li>
</ul>
<p>Example (build an RDD of all the lines in a text file that start with "ERROR"):</p>
<pre><code>lines = textFile("log.txt")
errors = lines.Filter(_.startsWith("ERROR"))
errors.persist()
errors.count()
</code></pre>
<p>RDDs are created by: </p>
<ul>
<li>by referring to data in external storage</li>
<li>by <em>transforming</em> other RDDs
<ul>
<li>like the <code>Filter</code> call above</li>
</ul></li>
</ul>
<p><strong>Actions</strong> kick off a computation on the RDD, like the <code>count()</code> call in the
example.</p>
<p>The <code>persist()</code> call tells Spark to hold the RDD in memory, for fast access.</p>
<p>No work is done until the <code>count()</code> action is seen and executed (lazy evaluation)</p>
<ul>
<li>you can save work by combining all the filters applied before the action
<ul>
<li>faster to read AND filter than to read the file entirely and then do another
filter pass</li>
</ul></li>
</ul>
<h2>Fault tolerance</h2>
<p>Lineage graphs: dependencies between RDDs</p>
<pre><code>lines (file)
|
\| filter(_.startsWith("ERROR"))
errors
</code></pre>
<p>Machines:</p>
<pre><code>file = b1 b2 b3 b4 b5
p1 p2 <-\ p3 p4 p5
----- ----- -----
| M1| | |M2 | |M3 |
----- | ----- -----
b1 b2 --/ b3 b4 b5
</code></pre>
<p>If you lose an RDD's partition like p4 (because M2 failed), you can rebuild it
by tracing through the dependency graph and decide (based on what's already
computed) where to start and what to recompute.</p>
<p>How do you know on what machines to recompute? In this example, <code>b3</code> and <code>b4</code>
would be replicated on other machines (maybe on <code>M1</code> and <code>M3</code>), so that's where
you would restart the computation.</p>
<h2>Comparison</h2>
<pre><code> | RDDs | DSM
-----------*-------------------------------|---------------------------
reads | any type / granularity | fine-grained
writes | only through transformations | fine-grained
| coarse grained
faults | recompute | checkpoint (a la Remus)
stragglers | restart job on diff. machine | ? no good strategy ?
</code></pre>
<h2>Spark computation expressivity</h2>
<p>Spark is pretty general: a lot of existing parallel computation paradigms, like
MapReduce, can be implemented easily on top of it</p>
<p>The reason coarse-grained writes are good enough is because a lot of parallel
algorithms simply apply the same op over all data. </p>
<h2>Partitioning</h2>
<p>Can have a custom partitioning function that says "this RDD has 10 partitions,
1st one is all elements starting with <code>a</code>, etc.."</p>
<p>If you use your data set multiple times, grouping it properly so that the data
you need sits on the same machine is important.</p>
<h2>PageRank example</h2>
<p>Example:</p>
<pre><code> the "o"'s are webpages
the arrows are links (sometimes directed, if not just pick a direction)
1 2
o<---o <-\
|\ | |
| \ | |
| \ | |
* ** |
o--->o --/
3 4
</code></pre>
<p>Algorithm:</p>
<ul>
<li>Start every page with rank 1</li>
<li>Everyone splits their rank across their neighbours
<ul>
<li>website 1 will give 0.5 to node 3 and node 4 and receive 0.5 from node 2 </li>
</ul></li>
<li>Iterate how many times? Until it converges apparently.</li>
</ul>
<p>Data:</p>
<pre><code>RDD1 'links': (url, links)
- can compute with a map operation
RDD2 'ranks': (url, rank)
- links.join(ranks) -> (url, (links, rank))
- .flatMap(
(url, (links, rank)))
=>
links.map( l -> (l.dest, rank/n))
)
- TODO: not sure why 'rank/n` or how this transformation works
- store result in RDD3 'contribs'
- update ranks with contribs
- ranks = contribs.reduceByKey( _ + _ )
</code></pre>
<p>Example of bad allocation, because we'll transfer a lot of data:</p>
<pre><code> the squares are machines (partitions)
links ranks
------------------------- ------------------------
|(a,...)|(d,...)|(c,...)| |(d,1) |(e,5) |(c,3) |
|(b,...)| |(e,...)| | |(a,1) |(b,1) |
------------------------- -----------------------
\ /
\------------\ /-------------------/
\/
-------------------------
|(a,...)|(d,...)| |
| | | |
-------------------------
</code></pre>
<p>Example with partitioning:</p>
<pre><code> links ranks
------------------------- ------------------------
|(a,...)|(c,...)|(e,...)| |(a,1) |(c,3) |(e,3) |
|(b,...)|(d,...)| | |(b,1) |(d,1) | |
------------------------- ------------------------
contribs are easy to compute locally now
</code></pre>
<p>Does PageRank need communication at all then? Yes, the <code>contribs</code> RDD does a
<code>reduceByKey</code></p>
<p>TODO: Not sure what it does</p>
<h2>Internal representation</h2>
<p>RDD methods:</p>
<ul>
<li><code>partitions</code> -- returns a list of partitions</li>
<li><code>preferredLocations(p)</code> -- returns the preferred locations of a partition
<ul>
<li>tells you about machines where computation would be faster</li>
</ul></li>
<li><code>dependencies</code>
<ul>
<li>how you depend on other RDDs</li>
</ul></li>
<li><code>iterator(p, parentIters)</code>
<ul>
<li>ask an RDD to compute one of its partitions</li>
</ul></li>
<li><code>partitioner</code>
<ul>
<li>allows you to specify a partitioning function</li>
</ul></li>
</ul>
<h1>6.824 notes</h1>
<pre><code>Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
Zaharia, Chowdhury, Das, Dave, Ma, McCauley, Franklin, Shenker, Stoica
NSDI 2012
Had TreadMarks since 1996, and Distributed Shared Memory is a very general abstraction. Why use MapReduce? Or why even use TreadMarks?
Say looking through a log, why not implement it using the regular abstractions (sockets, files etc?)
Saves a lot of work:
communication between nodes
distribute code
schedule work
handle failures
The MapReduce paper had a lot of impact on big data analytics: simple and powerful.
But bit too rigid. Other systems proposed fixes:
Dryad (Microsoft 2007): any directed acyclic graph, edges are communication channels, can be through disk or via TCP.
+ can implement multiple iterations
+ can pipeline through RAM, don't have to go to disk
- very low level:
doesn't deal with partitioning of data, want 100,000 mappers? add 100,000 nodes
what happens if you run out of RAM? (brief mention of "downgrading" a TCP channel to a disk file)
- doesn't checkpoint/replicate, in the middle of the run (so failures can be expensive)
* Pig latin (Yahoo 2008): programming language that compiles to MapReduce. Adds "Database style" operators, mainly Join
Join: dataset 1 (k1,v1), dataset 2 (k1, v2). ==> (k1, v1, v2), takes cartesian product (all tuples of combinations of v1, v2 with same k1)
Example: dataset 1: all clicks on products on website, dataset 2: demographics (age of users), want average age of customer per product.
+ allows multiple iterations
+ can express more
- still has rigidness from MR (writes to disk after map, to replicated storage after reduce, RAM)
Spark
A framework for large scale distributed computation.
An expressive programming model (can express iteration and joins)
Gives user control over trade off between fault tolerance with performance
if user frequently perist w/REPLICATE, fast recovery, but slower execution
if infrequently, fast execution but slow recovery
Relatively recent release, but used by (partial list) IBM, Groupon, Yahoo, Baidu..
Can get substantial performance gains when dataset (or a major part of it) can fit in memory, so anticipated to get more traction.
MapReduce is simple
Abstraction of Resilient Distributed Datasets: an RDD is a collection of partitions of records.
Two operations on RDDs:
Transformations: compute a new RDD from existing RDDs (flatMap, reduceByKey)
this just specifies a plan. runtime is lazy - doesn't have to materialize (compute), so it doesn't
Actions: where some effect is requested: result to be stored, get specific value, etc.
causes RDDs to materialize.
Logistic regression (from paper):
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
* w is sent with the closure to the nodes
* materializes a new RDD in every loop iteration
PageRank (from paper):
val links = spark.textFile(...).map(...).persist() // (URL, outlinks)
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}
What is an RDD (table 3, S4)
list of partitions
list of (parent RDD, wide/narrow dependency)
function to compute
partitioning scheme
computation placement hint
Each transformation takes (one or more) RDDs, and outputs the transformed RDD.
Q: Why does an RDD carry metadata on its partitioning?
A: so transformations that depend on multiple RDDs know whether they need to shuffle data (wide dependency) or not (narrow)
Allows users control over locality and reduces shuffles.
Q: Why the distinction between narrow and wide dependencies?
A: In case of failure.
narrow dependency only depends on a few partitions that need to be recomputed.
wide dependency might require an entire RDD
Handling faults.
When Spark computes, by default it only generates one copy of the result, doesn't replicate. Without replication, no matter if it's put in RAM or disk, if node fails, on permanent failure, data is gone.
When some partition is lost and needs to be recomputed, the scheduler needs to find a way to recompute it. (a fault can be detected by using a heartbeat)
will need to compute all partitions it depends on, until a partition in RAM/disk, or in replicated storage.
if wide dependency, will need all partitions of that dependency to recompute, if narrow just one that RDD
So two mechanisms enable recovery from faults: lineage, and policy of what partitions to persist (either to one node or replicated)
We talked about lineage before (Transformations)
The user can call persist on an RDD.
With RELIABLE flag, will keep multiple copies (in RAM if possible, disk if RAM is full)
With REPLICATE flag, will write to stable storage (HDFS)
Without flags, will try to keep in RAM (will spill to disk when RAM is full)
Q: Is persist a transformation or an action?
A: neither. It doesn't create a new RDD, and doesn't cause materialization. It's an instruction to the scheduler.
Q: By calling persist without flags, is it guaranteed that in case of fault that RDD wouldn't have to be recomputed?
A: No. There is no replication, so a node holding a partition could fail.
Replication (either in RAM or in stable storage) is necessary
Currently only manual checkpointing via calls to persist.
Q: Why implement checkpointing? (it's expensive)
A: Long lineage could cause large recovery time. Or when there are wide dependencies a single failure might require many partition re-computations.
Checkpointing is like buying insurance: pay writing to stable storage so can recover faster in case of fault.
Depends on frequency of failure and on cost of slower recovery
An automatic checkpointing will take these into account, together with size of data (how much time it takes to write), and computation time.
So can handle a node failure by recomputing lineage up to partitions that can be read from RAM/Disk/replicated storage.
Q: Can Spark handle network partitions?
A: Nodes that cannot communicate with scheduler will appear dead. The part of the network that can be reached from scheduler can continue
computation, as long as it has enough data to start the lineage from (if all replicas of a required partition cannot be reached, cluster
cannot make progress)
What happens when there isn't enough memory?
- LRU (Least Recently Used) on partitions
- first on non-persisted
- then persisted (but they will be available on disk. makes sure user cannot overbook RAM)
- user can have control on order of eviction via "persistence priority"
- no reason not to discard non-persisted partitions (if they've already been used)
Shouldn't throw away partitions in RAM that are required but hadn't been used.
degrades to "almost" MapReduce behavior
In figure 7, k-means on 100 Hadoop nodes takes 76-80 seconds
In figure 12, k-means on 25 Spark nodes (with no partitions allowed in memory) takes 68.8
Difference could be because MapReduce would use replicated storage after reduce, but Spark by default would only spill to local disk, no network latency and I/O load on replicas.
no architectural reason why Spark would be slower than MR
Spark assumes it runs on an isolated memory space (multiple schedulers don't share the memory pool well).
Can be solved using a "unified memory manager"
Note that when there is reuse of RDDs between jobs, they need to run on the same scheduler to benefit anyway.
(from [P09])
Why not just use parallel databases? Commercially available: "Teradata, Aster Data, Netezza, DATAl-
legro (and therefore soon Microsoft SQL Server via Project Madi-
son), Dataupia, Vertica, ParAccel, Neoview, Greenplum, DB2 (via
the Database Partitioning Feature), and Oracle (via Exadata)"
At the time, Parallel DBMS were
* Some are expensive and Hard to set up right
* SQL declarative (vs. procedural)
* Required schema, indices etc (an advantages in some cases)
* "Not made here"
Picollo [P10] uses snapshots of a distributed key-value store to handle fault tolerance.
- Computation is comprised of control functions and kernel functions.
- Control functions are responsible for setting up tables (also locality), launching kernels, synchronization (barriers that wait for all kernels to complete), and starting checkpoints
- Kernels use the key value store. There is a function to merge conflicting writes.
- Checkpoints using Chandy-Lamport
* all data has to fit in RAM
* to recover, all nodes need to revert (expensive)
* no way to mitigate stragglers, cannot just re-run a kernel without reverting to a snapshot
[P09] "A Comparison of Approaches to Large-Scale Data Analysis", Pavlo et al. SIGMOD'09
[P10] Piccolo: Building Fast, Distributed Programs with Partitioned Tables, Power and Li, OSDI'10
</code></pre>