Skip to content

Commit

Permalink
Merge pull request NixOS#703 from kquick/libpqxx_undeprecate
Browse files Browse the repository at this point in the history
Update libpqxx usage to move away from deprecated API interactions.
  • Loading branch information
edolstra authored Apr 1, 2020
2 parents 2d092a6 + a055796 commit 62e6c65
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 172 deletions.
49 changes: 23 additions & 26 deletions src/hydra-evaluator/hydra-evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ struct Evaluator

pqxx::work txn(*conn);

auto res = txn.parameterized
auto res = txn.exec
("select project, j.name, lastCheckedTime, triggerTime, checkInterval, j.enabled as jobset_enabled from Jobsets j join Projects p on j.project = p.name "
"where j.enabled != 0 and p.enabled != 0").exec();
"where j.enabled != 0 and p.enabled != 0");


auto state(state_.lock());

Expand Down Expand Up @@ -126,12 +127,11 @@ struct Evaluator
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Jobsets set startTime = $1 where project = $2 and name = $3")
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.exec_params0
("update Jobsets set startTime = $1 where project = $2 and name = $3",
now,
jobset.name.first,
jobset.name.second);
txn.commit();
}

Expand Down Expand Up @@ -366,27 +366,24 @@ struct Evaluator
/* Clear the trigger time to prevent this
jobset from getting stuck in an endless
failing eval loop. */
txn.parameterized
("update Jobsets set triggerTime = null where project = $1 and name = $2 and startTime is not null and triggerTime <= startTime")
(jobset.name.first)
(jobset.name.second)
.exec();
txn.exec_params0
("update Jobsets set triggerTime = null where project = $1 and name = $2 and startTime is not null and triggerTime <= startTime",
jobset.name.first,
jobset.name.second);

/* Clear the start time. */
txn.parameterized
("update Jobsets set startTime = null where project = $1 and name = $2")
(jobset.name.first)
(jobset.name.second)
.exec();
txn.exec_params0
("update Jobsets set startTime = null where project = $1 and name = $2",
jobset.name.first,
jobset.name.second);

if (!WIFEXITED(status) || WEXITSTATUS(status) > 1) {
txn.parameterized
("update Jobsets set errorMsg = $1, lastCheckedTime = $2, errorTime = $2, fetchErrorMsg = null where project = $3 and name = $4")
(fmt("evaluation %s", statusToString(status)))
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.exec_params0
("update Jobsets set errorMsg = $1, lastCheckedTime = $2, errorTime = $2, fetchErrorMsg = null where project = $3 and name = $4",
fmt("evaluation %s", statusToString(status)),
now,
jobset.name.first,
jobset.name.second);
}

txn.commit();
Expand All @@ -411,7 +408,7 @@ struct Evaluator
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized("update Jobsets set startTime = null").exec();
txn.exec("update Jobsets set startTime = null");
txn.commit();
}

Expand Down
18 changes: 9 additions & 9 deletions src/hydra-queue-runner/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,21 @@ void State::failStep(
for (auto & build : indirect) {
if (build->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build->id);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0")
(build->id)
((int) (build->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()))
(result.startTime)
(result.stopTime)
(result.stepStatus == bsCachedFailure ? 1 : 0).exec();
txn.exec_params0
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0",
build->id,
(int) (build->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()),
result.startTime,
result.stopTime,
result.stepStatus == bsCachedFailure ? 1 : 0);
nrBuildsDone++;
}

/* Remember failed paths in the database so that they
won't be built again. */
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & path : step->drv->outputPaths())
txn.parameterized("insert into FailedPaths values ($1)")(localStore->printStorePath(path)).exec();
for (auto & path : step->drv.outputPaths())
txn.exec_params0("insert into FailedPaths values ($1)", localStore->printStorePath(path));

txn.commit();
}
Expand Down
195 changes: 96 additions & 99 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <iostream>
#include <thread>
#include <optional>

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -228,18 +229,18 @@ void State::monitorMachinesFile()
void State::clearBusy(Connection & conn, time_t stopTime)
{
pqxx::work txn(conn);
txn.parameterized
("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy != 0")
((int) bsAborted)
(stopTime, stopTime != 0).exec();
txn.exec_params0
("update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy != 0",
(int) bsAborted,
stopTime != 0 ? std::make_optional(stopTime) : std::nullopt);
txn.commit();
}


unsigned int State::allocBuildStep(pqxx::work & txn, BuildID buildId)
{
auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(buildId).exec();
return res[0][0].is_null() ? 1 : res[0][0].as<int>() + 1;
auto res = txn.exec_params1("select max(stepnr) from BuildSteps where build = $1", buildId);
return res[0].is_null() ? 1 : res[0].as<int>() + 1;
}


Expand All @@ -249,27 +250,27 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID
restart:
auto stepNr = allocBuildStep(txn, buildId);

auto r = txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) on conflict do nothing")
(buildId)
(stepNr)
(0) // == build
(localStore->printStorePath(step->drvPath))
(status == bsBusy ? 1 : 0)
(startTime, startTime != 0)
(step->drv->platform)
((int) status, status != bsBusy)
(propagatedFrom, propagatedFrom != 0)
(errorMsg, errorMsg != "")
(startTime, startTime != 0 && status != bsBusy)
(machine).exec();
auto r = txn.exec_params
("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) on conflict do nothing",
buildId,
stepNr,
0, // == build
localStore->printStorePath(step->drvPath),
status == bsBusy ? 1 : 0,
startTime != 0 ? std::make_optional(startTime) : std::nullopt,
step->drv.platform,
status != bsBusy ? std::make_optional((int) status) : std::nullopt,
propagatedFrom != 0 ? std::make_optional(propagatedFrom) : std::nullopt, // internal::params
errorMsg != "" ? std::make_optional(errorMsg) : std::nullopt,
startTime != 0 && status != bsBusy ? std::make_optional(startTime) : std::nullopt,
machine);

if (r.affected_rows() == 0) goto restart;

for (auto & output : step->drv->outputs)
txn.parameterized
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)")
(buildId)(stepNr)(output.first)(localStore->printStorePath(output.second.path)).exec();
for (auto & output : step->drv.outputs)
txn.exec_params0
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)",
buildId, stepNr, output.first, localStore->printStorePath(output.second.path));

if (status == bsBusy)
txn.exec(fmt("notify step_started, '%d\t%d'", buildId, stepNr));
Expand All @@ -280,12 +281,11 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID

void State::updateBuildStep(pqxx::work & txn, BuildID buildId, unsigned int stepNr, StepState stepState)
{
if (txn.parameterized
("update BuildSteps set busy = $1 where build = $2 and stepnr = $3 and busy != 0 and status is null")
((int) stepState)
(buildId)
(stepNr)
.exec().affected_rows() != 1)
if (txn.exec_params
("update BuildSteps set busy = $1 where build = $2 and stepnr = $3 and busy != 0 and status is null",
(int) stepState,
buildId,
stepNr).affected_rows() != 1)
throw Error("step %d of build %d is in an unexpected state", stepNr, buildId);
}

Expand All @@ -295,16 +295,15 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
{
assert(result.startTime);
assert(result.stopTime);
txn.parameterized
("update BuildSteps set busy = 0, status = $1, errorMsg = $4, startTime = $5, stopTime = $6, machine = $7, overhead = $8, timesBuilt = $9, isNonDeterministic = $10 where build = $2 and stepnr = $3")
((int) result.stepStatus)(buildId)(stepNr)
(result.errorMsg, result.errorMsg != "")
(result.startTime)(result.stopTime)
(machine, machine != "")
(result.overhead, result.overhead != 0)
(result.timesBuilt, result.timesBuilt > 0)
(result.isNonDeterministic, result.timesBuilt > 1)
.exec();
txn.exec_params0
("update BuildSteps set busy = 0, status = $1, errorMsg = $4, startTime = $5, stopTime = $6, machine = $7, overhead = $8, timesBuilt = $9, isNonDeterministic = $10 where build = $2 and stepnr = $3",
(int) result.stepStatus, buildId, stepNr,
result.errorMsg != "" ? std::make_optional(result.errorMsg) : std::nullopt,
result.startTime, result.stopTime,
machine != "" ? std::make_optional(machine) : std::nullopt,
result.overhead != 0 ? std::make_optional(result.overhead) : std::nullopt,
result.timesBuilt > 0 ? std::make_optional(result.timesBuilt) : std::nullopt,
result.timesBuilt > 1 ? std::make_optional(result.isNonDeterministic) : std::nullopt);
assert(result.logFile.find('\t') == std::string::npos);
txn.exec(fmt("notify step_finished, '%d\t%d\t%s'",
buildId, stepNr, result.logFile));
Expand All @@ -317,25 +316,23 @@ int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t sto
restart:
auto stepNr = allocBuildStep(txn, build->id);

auto r = txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8) on conflict do nothing")
(build->id)
(stepNr)
(1) // == substitution
(localStore->printStorePath(drvPath))
(0)
(0)
(startTime)
(stopTime).exec();
auto r = txn.exec_params
("insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8) on conflict do nothing",
build->id,
stepNr,
1, // == substitution
(localStore->printStorePath(drvPath)),
0,
0,
startTime,
stopTime);

if (r.affected_rows() == 0) goto restart;

txn.parameterized
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)")
(build->id)
(stepNr)
(outputName)
(localStore->printStorePath(storePath)).exec();
txn.exec_params0
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)",
build->id, stepNr, outputName,
localStore->printStorePath(storePath));

return stepNr;
}
Expand Down Expand Up @@ -402,50 +399,50 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
{
if (build->finishedInDB) return;

if (txn.parameterized("select 1 from Builds where id = $1 and finished = 0")(build->id).exec().empty()) return;
if (txn.exec_params("select 1 from Builds where id = $1 and finished = 0", build->id).empty()) return;

txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8, notificationPendingSince = $4 where id = $1")
(build->id)
((int) (res.failed ? bsFailedWithOutput : bsSuccess))
(startTime)
(stopTime)
(res.size)
(res.closureSize)
(res.releaseName, res.releaseName != "")
(isCachedBuild ? 1 : 0).exec();
txn.exec_params0
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8, notificationPendingSince = $4 where id = $1",
build->id,
(int) (res.failed ? bsFailedWithOutput : bsSuccess),
startTime,
stopTime,
res.size,
res.closureSize,
res.releaseName != "" ? std::make_optional(res.releaseName) : std::nullopt,
isCachedBuild ? 1 : 0);

txn.parameterized("delete from BuildProducts where build = $1")(build->id).exec();
txn.exec_params0("delete from BuildProducts where build = $1", build->id);

unsigned int productNr = 1;
for (auto & product : res.products) {
txn.parameterized
("insert into BuildProducts (build, productnr, type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)")
(build->id)
(productNr++)
(product.type)
(product.subtype)
(product.fileSize, product.isRegular)
(product.sha1hash.to_string(Base16, false), product.isRegular)
(product.sha256hash.to_string(Base16, false), product.isRegular)
(product.path)
(product.name)
(product.defaultPath).exec();
txn.exec_params0
("insert into BuildProducts (build, productnr, type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
build->id,
productNr++,
product.type,
product.subtype,
product.isRegular ? std::make_optional(product.fileSize) : std::nullopt,
product.isRegular ? std::make_optional(product.sha1hash.to_string(Base16, false)) : std::nullopt,
product.isRegular ? std::make_optional(product.sha256hash.to_string(Base16, false)) : std::nullopt,
product.path,
product.name,
product.defaultPath);
}

txn.parameterized("delete from BuildMetrics where build = $1")(build->id).exec();
txn.exec_params0("delete from BuildMetrics where build = $1", build->id);

for (auto & metric : res.metrics) {
txn.parameterized
("insert into BuildMetrics (build, name, unit, value, project, jobset, job, timestamp) values ($1, $2, $3, $4, $5, $6, $7, $8)")
(build->id)
(metric.second.name)
(metric.second.unit, metric.second.unit != "")
(metric.second.value)
(build->projectName)
(build->jobsetName)
(build->jobName)
(build->timestamp).exec();
txn.exec_params0
("insert into BuildMetrics (build, name, unit, value, project, jobset, job, timestamp) values ($1, $2, $3, $4, $5, $6, $7, $8)",
build->id,
metric.second.name,
metric.second.unit != "" ? std::make_optional(metric.second.unit) : std::nullopt,
metric.second.value,
build->projectName,
build->jobsetName,
build->jobName,
build->timestamp);
}

nrBuildsDone++;
Expand All @@ -455,8 +452,8 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
bool State::checkCachedFailure(Step::ptr step, Connection & conn)
{
pqxx::work txn(conn);
for (auto & path : step->drv->outputPaths())
if (!txn.parameterized("select 1 from FailedPaths where path = $1")(localStore->printStorePath(path)).exec().empty())
for (auto & path : step->drv.outputPaths())
if (!txn.exec_params("select 1 from FailedPaths where path = $1", localStore->printStorePath(path)).empty())
return true;
return false;
}
Expand Down Expand Up @@ -678,7 +675,7 @@ void State::dumpStatus(Connection & conn)
pqxx::work txn(conn);
// FIXME: use PostgreSQL 9.5 upsert.
txn.exec("delete from SystemStatus where what = 'queue-runner'");
txn.parameterized("insert into SystemStatus values ('queue-runner', $1)")(out.str()).exec();
txn.exec_params0("insert into SystemStatus values ('queue-runner', $1)", out.str());
txn.exec("notify status_dumped");
txn.commit();
}
Expand Down Expand Up @@ -808,11 +805,11 @@ void State::run(BuildID buildOne)
pqxx::work txn(*conn);
for (auto & step : steps) {
printMsg(lvlError, format("cleaning orphaned step %d of build %d") % step.second % step.first);
txn.parameterized
("update BuildSteps set busy = 0, status = $1 where build = $2 and stepnr = $3 and busy != 0")
((int) bsAborted)
(step.first)
(step.second).exec();
txn.exec_params0
("update BuildSteps set busy = 0, status = $1 where build = $2 and stepnr = $3 and busy != 0",
(int) bsAborted,
step.first,
step.second);
}
txn.commit();
} catch (std::exception & e) {
Expand Down
Loading

0 comments on commit 62e6c65

Please sign in to comment.