Skip to content

Commit

Permalink
feat: signal update success to api (#139)
Browse files Browse the repository at this point in the history
* feat: signal update success to api

* test: make test server stricter about incoming requests

* test: check whether payload has expected shape

* refactor!: rename get batches endpoint env var

* refactor: rename route to prevent naming collision
  • Loading branch information
m90 authored Nov 13, 2023
1 parent 0a7d8a0 commit ed284dc
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 19 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/docker.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@ jobs:
max_attempts: 50
retry_wait_seconds: 2
warning_on_retry: false
command: if (($(docker-compose ${{ env.COMPOSE_ARGS }} logs wdqs-updater | grep "org.wikidata.query.rdf.tool.Updater - Polled" | wc -l) >= 10)); then exit 0; else exit 1; fi
command: |
num_lines=$(docker-compose ${{ env.COMPOSE_ARGS }} logs wdqs-updater | grep "org.wikidata.query.rdf.tool.Updater - Polled" | wc -l)
if [ $num_lines -gt 9 ]; then
exit 0;
else
echo "Found $num_lines lines, retrying."
exit 1;
fi
- name: Check that Mock API server did not fail assertions
run: |
num_failures=$(docker-compose ${{ env.COMPOSE_ARGS }} logs api | grep "\[FAILURE\]" | wc -l)
echo "Found $num_failures failures."
if [ $num_failures -gt 0 ];
then exit 1
fi
- name: Make a sparql request
run: |
NUM_BINDINGS=$(curl 'http://localhost:9999/bigdata/namespace/wdq/sparql' -H 'Accept: application/sparql-results+json' --data-raw 'query=SELECT+*+WHERE%7B+%3Fs+%3Fp+%3Fo+%7D' | jq '.results.bindings | length')
Expand Down
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
FROM maven:3.6.3-jdk-8 as jarjar

COPY ./ /tmp
WORKDIR /tmp
RUN mvn clean compile assembly:single
COPY ./pom.xml /tmp/pom.xml
RUN mvn clean install

COPY ./ /tmp
RUN mvn compile assembly:single


FROM openjdk:8-jdk-alpine
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ services:
environment:
- WBSTACK_WIKIBASE_SCHEME=http
- WBSTACK_LOOP_LIMIT=100
- WBSTACK_API_ENDPOINT=http://api.svc:3030/getBatches
- WBSTACK_API_ENDPOINT_GET_BATCHES=http://api.svc:3030/getBatches
- WBSTACK_API_ENDPOINT_MARK_NOT_DONE=http://api.svc:3030/markNotDone
- WBSTACK_API_ENDPOINT_MARK_DONE=http://api.svc:3030/markDone
- WBSTACK_BATCH_SLEEP=1
- WIKIBASE_HOST=wikibase.svc
- HEAP_SIZE=32m
69 changes: 60 additions & 9 deletions seeder/server.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,54 @@
var http = require('http');
const http = require('http');
const assert = require('assert');
const wbEdit = require( 'wikibase-edit' )( require( './wikibase-edit.config' ) );

let batchId = 1;

http.createServer(function (req, res) {
(async () => {
switch (req.url) {
case '/markDone':
case '/markNotDone':
if (req.method !== 'POST') {
const err = new Error('Method not allowed');
err.status = 405;
return Promise.reject(err);
}
return new Promise((resolve, reject) => {
const body = [];
req
.on('error', (err) => {
reject(err);
})
.on('data', (chunk) => {
body.push(chunk);
})
.on('end', () => {
try {
const jsonBody = JSON.parse(
Buffer.concat(body).toString('utf8')
);
assert(
Array.isArray(jsonBody.batches),
'Expected a `batches` property on the request body.'
);
assert(
jsonBody.batches.length,
'Expected `batches` to be a non-empty array.'
);
} catch (err) {
reject(err);
return;
}
resolve({ status: 200, body: '1' });
})
});
case '/getBatches':
if (req.method !== 'GET') {
const err = new Error('Method not allowed');
err.status = 405;
return Promise.reject(err);
}
const numEntities = 20;
const entities = [];

Expand All @@ -26,6 +70,7 @@ http.createServer(function (req, res) {
console.log(new Date().toISOString());

responseObject = {
'id': batchId++,
'entityIds': entities.join(','),
'wiki': {
'domain': process.env.API_WIKIBASE_DOMAIN,
Expand All @@ -36,17 +81,23 @@ http.createServer(function (req, res) {
},

};
res.writeHead(200, {'Content-Type': 'text/json'});
res.end(JSON.stringify([responseObject]));
return;
return {
status: 200,
headers: {'Content-Type': 'text/json'},
body: JSON.stringify([responseObject])
};
default:
res.writeHead(404);
res.end('Not found');
const err = new Error('Not found');
err.status = 404;
return Promise.reject(err);
}
})()
.catch((err) => {
console.error('Failed handling request: %s', err.message);
res.writeHead(500);
res.end(err.message);
console.error('[FAILURE] Failed handling request: %s', err.message);
return { status: err.status || 500, body: err.message };
})
.then((result) => {
res.writeHead(result.status, result.headers);
res.end(result.body)
})
}).listen(3030);
54 changes: 48 additions & 6 deletions src/main/java/org/wikidata/query/rdf/tool/WbStackUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.net.HttpURLConnection;
import java.net.URI;
Expand Down Expand Up @@ -71,6 +73,8 @@ public final class WbStackUpdate {

// Static configuration, primarily from environment variables
private static String wbStackApiEndpoint;
private static String wbStackApiEndpointMarkDone;
private static String wbStackApiEndpointMarkNotDone;
private static long wbStackSleepBetweenApiCalls;
private static int wbStackUpdaterThreadCount;
private static String wbStackUpdaterNamespaces;
Expand All @@ -92,15 +96,19 @@ private WbStackUpdate() {
}

private static void setValuesFromEnvOrDie() {
if (System.getenv("WBSTACK_API_ENDPOINT") == null
if (System.getenv("WBSTACK_API_ENDPOINT_GET_BATCHES") == null
|| System.getenv("WBSTACK_API_ENDPOINT_MARK_NOT_DONE") == null
|| System.getenv("WBSTACK_API_ENDPOINT_MARK_DONE") == null
|| System.getenv("WBSTACK_BATCH_SLEEP") == null
|| System.getenv("WBSTACK_LOOP_LIMIT") == null) {
System.err.println("WBSTACK_API_ENDPOINT, WBSTACK_BATCH_SLEEP and WBSTACK_LOOP_LIMIT environment variables must be set.");
System.err.println("WBSTACK_API_ENDPOINT_*, WBSTACK_BATCH_SLEEP and WBSTACK_LOOP_LIMIT environment variables must be set.");
System.exit(1);
}

wbStackProxyMapIngress = System.getenv("WBSTACK_PROXYMAP_INGRESS");
wbStackApiEndpoint = System.getenv("WBSTACK_API_ENDPOINT");
wbStackApiEndpoint = System.getenv("WBSTACK_API_ENDPOINT_GET_BATCHES");
wbStackApiEndpointMarkNotDone = System.getenv("WBSTACK_API_ENDPOINT_MARK_FAILED");
wbStackApiEndpointMarkDone = System.getenv("WBSTACK_API_ENDPOINT_MARK_DONE");
wbStackSleepBetweenApiCalls = Long.parseLong(System.getenv("WBSTACK_BATCH_SLEEP"));
wbStackUpdaterThreadCount = Integer.parseInt(System.getenv().getOrDefault("WBSTACK_THREAD_COUNT", "10"));
wbStackUpdaterNamespaces = System.getenv().getOrDefault("WBSTACK_UPDATER_NAMESPACES", "120,122,146");
Expand Down Expand Up @@ -193,18 +201,45 @@ private static JsonArray getBatchesFromApi() throws IOException {
}
}

private static void updateRemoteBatchStatus(int batchId, boolean success) throws IOException {
URL obj = new URL(success ? wbStackApiEndpointMarkDone : wbStackApiEndpointMarkNotDone);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("POST");
con.setRequestProperty("User-Agent", USER_AGENT);
con.setDoOutput(true);

JsonObject body = new JsonObject();
JsonArray batches = new JsonArray();
batches.add(batchId);
body.add("batches", batches);

OutputStream os = con.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, "UTF-8");
osw.write(body.toString());
osw.flush();
osw.close();
os.close();
con.connect();

int responseCode = con.getResponseCode();
if (responseCode != 200) {
throw new IOException("Got non 200 response code from API: " + responseCode);
}
}

private static void updateBatch(JsonElement batchElement) {
// Get the values for the batch from the JSON
JsonObject batch = batchElement.getAsJsonObject();
String entityIDs = batch.get("entityIds").getAsString();
int batchId = batch.get("id").getAsInt();
JsonObject wiki = batch.get("wiki").getAsJsonObject();
String domain = wiki.get("domain").getAsString();
JsonObject wikiQsNamespace = wiki.get("wiki_queryservice_namespace").getAsJsonObject();
String qsBackend = wikiQsNamespace.get("backend").getAsString();
String qsNamespace = wikiQsNamespace.get("namespace").getAsString();

// Run the main Update class with our altered args
runUpdaterWithArgs(new String[]{
boolean updateWasSuccessful = runUpdaterWithArgs(new String[]{
"--wikibaseHost", domain,
"--ids", entityIDs,
"--entityNamespaces", wbStackUpdaterNamespaces,
Expand All @@ -213,11 +248,16 @@ private static void updateBatch(JsonElement batchElement) {
"--conceptUri", "https://" + domain
});

// TODO on success maybe report back?
try {
updateRemoteBatchStatus(batchId, updateWasSuccessful);
} catch (Exception ex) {
System.err.println("Failed to update remote batch status.");
ex.printStackTrace();
}
}

@SuppressFBWarnings(value = "IMC_IMMATURE_CLASS_PRINTSTACKTRACE", justification = "We should introduce proper logging framework")
private static void runUpdaterWithArgs(String[] args) {
private static boolean runUpdaterWithArgs(String[] args) {
try {
Closer closer = Closer.create();

Expand All @@ -242,7 +282,9 @@ private static void runUpdaterWithArgs(String[] args) {
} catch (Exception e) {
System.err.println("Failed batch!");
e.printStackTrace();
return false;
}
return true;
}

private static String getProxyMapString( UpdateOptions options ) {
Expand Down

0 comments on commit ed284dc

Please sign in to comment.