-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add file streams for teeing Job stdout/err to disc #1456
base: master
Are you sure you want to change the base?
Changes from 4 commits
dfeb6e5
71d5c89
70436c7
b7ddc4d
99bbf86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ | |
|
||
#include <algorithm> | ||
#include <cstring> | ||
#include <fstream> | ||
#include <iostream> | ||
#include <limits> | ||
#include <list> | ||
|
@@ -106,6 +107,7 @@ struct Job final : public GCObject<Job, Value> { | |
std::string echo; | ||
std::string stream_out; | ||
std::string stream_err; | ||
std::string stdout_teefiles, stderr_teefiles; | ||
HeapPointer<Value> bad_launch; | ||
HeapPointer<Value> bad_finish; | ||
double pathtime; | ||
|
@@ -125,7 +127,7 @@ struct Job final : public GCObject<Job, Value> { | |
|
||
Job(Database *db_, String *label_, String *dir_, String *stdin_file_, String *environ, | ||
String *cmdline_, bool keep, const char *echo, const char *stream_out, | ||
const char *stream_err); | ||
const char *stream_err, std::string stdout_teefiles, std::string stderr_teefiles); | ||
|
||
template <typename T, T (HeapPointerBase::*memberfn)(T x)> | ||
T recurse(T arg); | ||
|
@@ -276,16 +278,20 @@ struct JobEntry { | |
std::list<Status>::iterator status; | ||
std::unique_ptr<std::streambuf> stdout_linebuf; | ||
std::unique_ptr<std::streambuf> stderr_linebuf; | ||
std::vector<std::string> stdout_teefiles; | ||
std::vector<std::string> stderr_teefiles; | ||
|
||
JobEntry(JobTable::detail *imp_, RootPointer<Job> &&job_, std::unique_ptr<std::streambuf> stdout, | ||
std::unique_ptr<std::streambuf> stderr) | ||
std::unique_ptr<std::streambuf> stderr, std::vector<std::string> stdout_teefiles_, std::vector<std::string> stderr_teefiles_) | ||
: imp(imp_), | ||
job(std::move(job_)), | ||
pid(0), | ||
pipe_stdout(-1), | ||
pipe_stderr(-1), | ||
stdout_linebuf(std::move(stdout)), | ||
stderr_linebuf(std::move(stderr)) {} | ||
stderr_linebuf(std::move(stderr)), | ||
stdout_teefiles(std::move(stdout_teefiles_)), | ||
stderr_teefiles(std::move(stderr_teefiles_)) {} | ||
~JobEntry(); | ||
|
||
double runtime(struct timespec now); | ||
|
@@ -323,6 +329,8 @@ struct JobTable::detail { | |
std::unordered_map<int, std::unique_ptr<std::streambuf>> fd_bufs; | ||
std::unordered_map<int, std::unique_ptr<TermInfoBuf>> term_bufs; | ||
|
||
std::unordered_map<std::string, std::unique_ptr<std::ofstream>> teefiles; | ||
|
||
detail() {} | ||
|
||
~detail() { | ||
|
@@ -339,6 +347,9 @@ struct JobTable::detail { | |
for (auto &entry : term_bufs) { | ||
entry.second.release(); | ||
} | ||
for (auto &entry : teefiles) { | ||
entry.second->close(); | ||
} | ||
} | ||
|
||
CriticalJob critJob(double nexttime) const; | ||
|
@@ -783,7 +794,6 @@ static void launch(JobTable *jobtable) { | |
// Make the raw output streams and the TermInfoBufs | ||
// that jobs will use. We make one TermInfoBuf per | ||
// file descriptor that we're outputting to. | ||
// TODO: We could add file tee-ing here as well | ||
if (!jobtable->imp->fd_bufs.count(fd_out)) { | ||
std::unique_ptr<std::streambuf> fd_buf; | ||
if (fd_out != -1) { | ||
|
@@ -824,8 +834,28 @@ static void launch(JobTable *jobtable) { | |
} else { | ||
err = std::make_unique<NullBuf>(); | ||
} | ||
|
||
auto stdout_teefiles = split_null(task.job->stdout_teefiles); | ||
auto stdout_tee_names = std::vector<std::string>(); | ||
for (int i = 0; stdout_teefiles[i]; ++i) { | ||
stdout_tee_names.push_back(stdout_teefiles[i]); | ||
if (jobtable->imp->teefiles.find(stdout_teefiles[i]) == jobtable->imp->teefiles.end()) { | ||
jobtable->imp->teefiles[stdout_teefiles[i]] = std::make_unique<std::ofstream>(stdout_teefiles[i], std::ios::out | std::ios::trunc); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do an unlink after this so that we get a new inode instead of modifying the existing file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not seeing that function in the C++ reference I'm using, though I do recognize what it means in context. Mind giving me a bit more guidance what that would look like in code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unlink is a posix function: https://man7.org/linux/man-pages/man2/unlink.2.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apparently it's a bit less complete a reference than I expected, thanks! (They could at least have included it and marked it as system dependent somehow...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is all you would need but its probably best to only ignore the error if ENOENT is returned. Everything else should be logged as a warning
|
||
} | ||
} | ||
auto stderr_teefiles = split_null(task.job->stderr_teefiles); | ||
auto stderr_tee_names = std::vector<std::string>(); | ||
for (int i = 0; stderr_teefiles[i]; ++i) { | ||
stderr_tee_names.push_back(stderr_teefiles[i]); | ||
if (jobtable->imp->teefiles.find(stderr_teefiles[i]) == jobtable->imp->teefiles.end()) { | ||
jobtable->imp->teefiles[stderr_teefiles[i]] = std::make_unique<std::ofstream>(stderr_teefiles[i], std::ios::out | std::ios::trunc); | ||
} | ||
} | ||
delete[] stdout_teefiles; | ||
delete[] stderr_teefiles; | ||
|
||
std::shared_ptr<JobEntry> entry = std::make_shared<JobEntry>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to store an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a nice little tweak that makes me glad I don't write C++ on the regular... |
||
jobtable->imp.get(), std::move(task.job), std::move(out), std::move(err)); | ||
jobtable->imp.get(), std::move(task.job), std::move(out), std::move(err), std::move(stdout_tee_names), std::move(stderr_tee_names)); | ||
|
||
int stdout_stream[2]; | ||
int stderr_stream[2]; | ||
|
@@ -983,6 +1013,10 @@ bool JobTable::wait(Runtime &runtime) { | |
entry->job->db->save_output(entry->job->job, 1, buffer, got, entry->runtime(now)); | ||
if (!imp->batch) { | ||
entry->stdout_linebuf->sputn(buffer, got); | ||
for (auto &teefile : entry->stdout_teefiles) { | ||
auto &tee_fd = imp->teefiles.at(teefile); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: We generally use the |
||
tee_fd->write(buffer, got); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -1002,6 +1036,10 @@ bool JobTable::wait(Runtime &runtime) { | |
entry->job->db->save_output(entry->job->job, 2, buffer, got, entry->runtime(now)); | ||
if (!imp->batch) { | ||
entry->stderr_linebuf->sputn(buffer, got); | ||
for (auto &teefile : entry->stderr_teefiles) { | ||
auto &tee_fd = imp->teefiles.at(teefile); | ||
tee_fd->write(buffer, got); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -1083,7 +1121,7 @@ bool JobTable::wait(Runtime &runtime) { | |
|
||
Job::Job(Database *db_, String *label_, String *dir_, String *stdin_file_, String *environ, | ||
String *cmdline_, bool keep_, const char *echo_, const char *stream_out_, | ||
const char *stream_err_) | ||
const char *stream_err_, std::string stdout_teefiles_, std::string stderr_teefiles_) | ||
: db(db_), | ||
label(label_), | ||
cmdline(cmdline_), | ||
|
@@ -1096,7 +1134,9 @@ Job::Job(Database *db_, String *label_, String *dir_, String *stdin_file_, Strin | |
keep(keep_), | ||
echo(echo_), | ||
stream_out(stream_out_), | ||
stream_err(stream_err_) { | ||
stream_err(stream_err_), | ||
stdout_teefiles(stdout_teefiles_), | ||
stderr_teefiles(stderr_teefiles_) { | ||
start.tv_sec = stop.tv_sec = 0; | ||
start.tv_nsec = stop.tv_nsec = 0; | ||
|
||
|
@@ -1286,18 +1326,19 @@ static PRIMFN(prim_job_virtual) { | |
} | ||
|
||
static PRIMTYPE(type_job_create) { | ||
return args.size() == 12 && args[0]->unify(Data::typeString) && | ||
return args.size() == 14 && args[0]->unify(Data::typeString) && | ||
args[1]->unify(Data::typeString) && args[2]->unify(Data::typeString) && | ||
args[3]->unify(Data::typeString) && args[4]->unify(Data::typeString) && | ||
args[5]->unify(Data::typeInteger) && args[6]->unify(Data::typeString) && | ||
args[7]->unify(Data::typeInteger) && args[8]->unify(Data::typeString) && | ||
args[9]->unify(Data::typeString) && args[10]->unify(Data::typeString) && | ||
args[11]->unify(Data::typeInteger) && out->unify(Data::typeJob); | ||
args[11]->unify(Data::typeString) && args[12]->unify(Data::typeString) && | ||
args[13]->unify(Data::typeInteger) && out->unify(Data::typeJob); | ||
} | ||
|
||
static PRIMFN(prim_job_create) { | ||
JobTable *jobtable = static_cast<JobTable *>(data); | ||
EXPECT(12); | ||
EXPECT(14); | ||
STRING(label, 0); | ||
STRING(dir, 1); | ||
STRING(stdin_file, 2); | ||
|
@@ -1309,15 +1350,17 @@ static PRIMFN(prim_job_create) { | |
STRING(echo, 8); | ||
STRING(stream_out, 9); | ||
STRING(stream_err, 10); | ||
INTEGER_MPZ(is_atty, 11); | ||
STRING(stdout_teefiles, 11); | ||
STRING(stderr_teefiles, 12); | ||
INTEGER_MPZ(is_atty, 13); | ||
|
||
Hash hash; | ||
REQUIRE(mpz_sizeinbase(signature, 2) <= 8 * sizeof(hash.data)); | ||
mpz_export(&hash.data[0], 0, 1, sizeof(hash.data[0]), 0, 0, signature); | ||
|
||
Job *out = | ||
Job::alloc(runtime.heap, jobtable->imp->db, label, dir, stdin_file, env, cmd, | ||
mpz_cmp_si(keep, 0), echo->c_str(), stream_out->c_str(), stream_err->c_str()); | ||
mpz_cmp_si(keep, 0), echo->c_str(), stream_out->c_str(), stream_err->c_str(), stdout_teefiles->as_str(), stderr_teefiles->as_str()); | ||
|
||
out->record = jobtable->imp->db->predict_job(out->code.data[0], &out->pathtime); | ||
|
||
|
@@ -1396,7 +1439,7 @@ static PRIMFN(prim_job_cache) { | |
Value *joblist; | ||
if (reuse.found && !jobtable->imp->check) { | ||
Job *jobp = Job::claim(runtime.heap, jobtable->imp->db, dir, dir, stdin_file, env, cmd, true, | ||
STREAM_ECHO, STREAM_INFO, STREAM_WARNING); | ||
STREAM_ECHO, STREAM_INFO, STREAM_WARNING, "", ""); | ||
jobp->state = STATE_FORKED | STATE_STDOUT | STATE_STDERR | STATE_MERGED | STATE_FINISHED; | ||
jobp->job = job; | ||
jobp->record = reuse; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These close on their own, no need to write this part. Currently there are no release order deps so we don't need to be explicit like I was above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know!