-
Notifications
You must be signed in to change notification settings - Fork 4
/
init.cpp
216 lines (178 loc) · 5.35 KB
/
init.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/**
* Init.cpp
*
* The init function that should be called directly on cli.
*
* @author Toon Schoenmakers <[email protected]>
* @copyright 2015 - 2016 Copernica BV
*/
/**
* Dependencies
*/
#include "init.h"
#include "job.h"
#include "wrapper.h"
#include "base.h"
#include "stdin.h"
#include "tempdir.h"
#include "cache.h"
#include <yothalot.h>
/**
* Run the mapper
* @param input All the input
* @return int
*/
static int map(const Stdin &input)
{
// prevent exceptions
try
{
// wrap the php object
Wrapper mapreduce(input.object());
// get our argv
auto argv = Php::GLOBALS["argv"];
auto argc = Php::GLOBALS["argc"];
// modulo is the last argument
auto modulo = argc > 1 ? (int)argv.get(argc - 1) : 1;
// get the temp directory
static TempDir tempdir;
// create the task
Yothalot::MapTask task(base(), &mapreduce, modulo, input.target(), tempdir);
// add the data to process
task.process(input.data(), input.size());
// show output of mapper process
std::cout << task.output();
// done
return 0;
}
catch (const std::runtime_error &error)
{
// report error
Php::error << "Mapper error: " << error.what() << std::flush;
// failure
return -1;
}
}
/**
* Run the reducer
* @param input All the input
* @return int
*/
static int reduce(const Stdin &input)
{
// prevent exceptions
try
{
// wrap php object
Wrapper mapreduce(input.object());
// create the task
Yothalot::ReduceTask task(base(), &mapreduce, input.target(), false);
// add the data to process
task.process(input.data(), input.size());
// show output of mapper process
std::cout << task.output();
// done
return 0;
}
catch (const std::runtime_error &error)
{
// report error
Php::error << "Reducer error: " << error.what() << std::flush;
// failure
return -1;
}
}
/**
* Run the writer/finalizer
* @param input All the input
* @return int
*/
static int write(const Stdin &input)
{
// prevent exceptions
try
{
// wrap php object
Wrapper mapreduce(input.object());
// create the task
Yothalot::WriteTask task(base(), &mapreduce, input.target(), false);
// add the data to process
task.process(input.data(), input.size());
// show output of mapper process
std::cout << task.output();
// done
return 0;
}
catch (const std::runtime_error &error)
{
// report error
Php::error << "Writer error: " << error.what() << std::flush;
// failure
return -1;
}
}
/**
* Run a regular job (or a job that is part of a race, which is basically identical
* to a race job)
* @param input
* @return int
*/
static int run(const Stdin &input)
{
// prevent exceptions
try
{
// call the process method
auto result = input.object().call("process", Php::call("unserialize", Php::call("base64_decode", Php::Value(input.data(), input.size()))));
// capture the output
std::string output = Php::call("ob_get_clean");
// did we have output?
if (output.size() > 0) throw std::runtime_error(output);
// if there's no output, the job generated no output
if (result.isNull()) return 0;
// serialize the output, so that it can be unserialized at the caller side
std::cout << Php::call("base64_encode", Php::call("serialize", result));
// done
return 0;
}
catch (const std::runtime_error &error)
{
// report the error
Php::error << "Unexpected output: " << error.what() << std::flush;
// failure
return -1;
}
}
/**
* Our global init method, mostly used to call directly from cli using something
* like `php -r "YothalotInit('mapper');"`
* @param params
* @return Php::Value Return values are like normal programs really, 0 if success
* something else otherwise.
*/
Php::Value yothalotInit(Php::Parameters ¶ms)
{
// turn on all errors
Php::error_reporting(Php::Error::All);
Php::call("ini_set", "error_log", nullptr); // disable the error_log
Php::call("ini_set", "display_errors", "stderr");
// prevent PHP output during race algorithm
Php::call("ob_start");
// read all input
Stdin input;
// result variable
int result = -1;
// the run is the very fist simple task
if (strcasecmp(params[0].rawValue(), "run") == 0) result = run(input);
// check the type of task to run that is part of the mapreduce algorithm
else if (strcasecmp(params[0].rawValue(), "mapper") == 0 ||
strcasecmp(params[0].rawValue(), "kvmapper") == 0) result = map(input);
else if (strcasecmp(params[0].rawValue(), "reducer") == 0) result = reduce(input);
else if (strcasecmp(params[0].rawValue(), "finalizer") == 0) result = write(input);
// capture the output
auto output = Php::call("ob_get_clean");
// we expect the output to be empty
if (output.size() > 0) Php::error << "Unexpected output (" << output << ")" << std::flush;
// done with failure
return result;
}