-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSampleClient.cpp
More file actions
112 lines (97 loc) · 2.66 KB
/
SampleClient.cpp
File metadata and controls
112 lines (97 loc) · 2.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include "MapReduceFramework.h"
#include <cstdio>
#include <string>
#include <array>
#include <unistd.h>
class VString : public V1 {
public:
VString(std::string content) : content(content) { }
std::string content;
};
class KChar : public K2, public K3{
public:
KChar(char c) : c(c) { }
virtual bool operator<(const K2 &other) const {
return c < static_cast<const KChar&>(other).c;
}
virtual bool operator<(const K3 &other) const {
return c < static_cast<const KChar&>(other).c;
}
char c;
};
class VCount : public V2, public V3{
public:
VCount(int count) : count(count) { }
int count;
};
class CounterClient : public MapReduceClient {
public:
void map(const K1* key, const V1* value, void* context) const {
std::array<int, 256> counts;
counts.fill(0);
for(const char& c : static_cast<const VString*>(value)->content) {
counts[(unsigned char) c]++;
}
for (int i = 0; i < 256; ++i) {
if (counts[i] == 0)
continue;
KChar* k2 = new KChar(i);
VCount* v2 = new VCount(counts[i]);
usleep(150000);
emit2(k2, v2, context);
}
}
virtual void reduce(const IntermediateVec* pairs,
void* context) const {
const char c = static_cast<const KChar*>(pairs->at(0).first)->c;
int count = 0;
for(const IntermediatePair& pair: *pairs) {
count += static_cast<const VCount*>(pair.second)->count;
delete pair.first;
delete pair.second;
}
KChar* k3 = new KChar(c);
VCount* v3 = new VCount(count);
usleep(150000);
emit3(k3, v3, context);
}
};
int main(int argc, char** argv)
{
CounterClient client;
InputVec inputVec;
OutputVec outputVec;
VString s1("This string is full of characters");
VString s2("Multithreading is awesome");
VString s3("race conditions are bad");
inputVec.push_back({nullptr, &s1});
inputVec.push_back({nullptr, &s2});
inputVec.push_back({nullptr, &s3});
JobState state;
JobState last_state={UNDEFINED_STAGE,0};
JobHandle job = startMapReduceJob(client, inputVec, outputVec, 4);
getJobState(job, &state);
while (state.stage != REDUCE_STAGE || state.percentage != 100.0)
{
if (last_state.stage != state.stage || last_state.percentage != state.percentage){
printf("stage %d, %f%% \n",
state.stage, state.percentage);
}
usleep(100000);
last_state = state;
getJobState(job, &state);
}
printf("stage %d, %f%% \n",
state.stage, state.percentage);
printf("Done!\n");
closeJobHandle(job);
for (OutputPair& pair: outputVec) {
char c = ((const KChar*)pair.first)->c;
int count = ((const VCount*)pair.second)->count;
printf("The character %c appeared %d time%s\n",
c, count, count > 1 ? "s" : "");
delete pair.first;
delete pair.second;
}
return 0;
}