Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

icewind: validate mode #185

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ jobs:

## applications
- name: build and test icewind
run: cd icewind && ../gradlew --info clean build javadoc checkstyleMain
run: cd icewind && ../gradlew --info clean build checkstyleMain

- name: build and test torkeep
run: cd torkeep && ../gradlew --info clean build javadoc checkstyleMain
run: cd torkeep && ../gradlew --info clean build checkstyleMain

- name: build and test argus
run: cd argus && ../gradlew --info clean build javadoc checkstyleMain
run: cd argus && ../gradlew --info clean build checkstyleMain

- name: build and test bifrost
run: cd bifrost && ../gradlew --info clean build javadoc checkstyleMain
run: cd bifrost && ../gradlew --info clean build checkstyleMain

2 changes: 1 addition & 1 deletion caom2persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '2.4.18'
version = '2.4.19'

description = 'OpenCADC CAOM database library'
def git_url = 'https://github.com/opencadc/caom2db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ public HarvestStateDAO(DataSource dataSource, String database, String schema) {

protected void init() {
StringBuilder sb = new StringBuilder();
sb.append(database).append(".").append(schema).append(".");
if (database != null) {
sb.append(database).append(".");
}
sb.append(schema).append(".");
if (fakeSchemaTablePrefix != null) {
sb.append(fakeSchemaTablePrefix);
}
Expand Down
14 changes: 12 additions & 2 deletions icewind/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ org.opencadc.icewind.caom.url=jdbc:postgresql://{server}/{database}
# Base for generating Plane publisherID values
org.opencadc.icewind.basePublisherID={uri}

# (optional) exit after processing collections once
# (optional) exit after processing each collection once
#org.opencadc.icewind.exitWhenComplete=true

# (optional) retry previously failed (skipped) observations
# (optional mode) retry previously failed (skipped) observations
# this mode always assumes exitWhenComplete=true
org.opencadc.icewind.retrySkipped = true

# (optional mode) validate remote and local observation sets for consistency
# this mode always assumes exitWhenComplete=true
# validate mode always assumes retrySkipped and performs retries after validation
org.opencadc.icewind.validate = true
```

The _caom_ database account owns and manages (create, alter, drop) CAOM database objects
Expand Down Expand Up @@ -89,6 +94,11 @@ retry previously failed (skipped) observations listed in the `caom2.HarvestSkipU
table. This mode always assumes _exitWhenComplete_ so it terminates after one pass
through the list.

The `icewind` _validate_ mode queries the _repoService_ and local database asnd compares the
two sets of observations, identifies discrepancies (missed delete, missed observation, or
Observation.accMetaChecksum discrepancy) and schedules a retry by creating a new record
in the `caom2.HarvestSkipURI` table.

### cadcproxy.pem (optional)
This client certificate can be provided in /config directory. If present, it is used to
authenticate to the _repoService_ if the service requests a client certificate. If
Expand Down
2 changes: 1 addition & 1 deletion icewind/VERSION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## deployable containers have a semantic and build tag
# semantic version tag: major.minor[.patch]
# build version tag: timestamp
VER=0.9.13
VER=0.10.0
TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")"
unset VER
117 changes: 59 additions & 58 deletions icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,55 +92,41 @@ public class CaomHarvester implements Runnable {
private static final Long DEFAULT_IDLE_TIME = 30000L;

private final InitDatabase initdb;
private final HarvesterResource src;
private final HarvesterResource dest;
private final HarvestSource src;
private final HarvestDestination dest;
private final List<String> collections;
private final URI basePublisherID;
private final int batchSize;
private final int nthreads;
private final boolean full;
private final boolean skip;
private final boolean nochecksum;
private final boolean exitWhenComplete;
private final long maxIdle;

// hack option

// optional
int batchSize;
int numThreads;
boolean exitWhenComplete = false;
long maxSleep;
boolean validateMode = false;
boolean skipMode = false;
String retryErrorMessagePattern;

// not used by main
private boolean nochecksum;

/**
* Harvest everything.
*
* @param src source resource
* @param dest destination resource (must be a server/database/schema)
* @param collections list of collections to process
* @param basePublisherID base to use in generating Plane publisherID values in destination database
* @param batchSize number of observations per batch (~memory consumption)
* @param nthreads max threads when harvesting from a service
* @param full full harvest of all source entities
* @param skip attempt retry of all skipped observations
* @param nochecksum disable metadata checksum comparison
* @param exitWhenComplete exit after processing each collection if true, else continuously loop
* @param maxIdle max sleep time in seconds between runs when running continuously
*/
public CaomHarvester(HarvesterResource src, HarvesterResource dest, List<String> collections,
URI basePublisherID, int batchSize, int nthreads, boolean full, boolean skip,
boolean nochecksum, boolean exitWhenComplete, long maxIdle) {
public CaomHarvester(HarvestSource src, List<String> collections, HarvestDestination dest, URI basePublisherID) {
this.src = src;
this.dest = dest;
this.collections = collections;
this.dest = dest;
this.basePublisherID = basePublisherID;
this.batchSize = batchSize;
this.nthreads = nthreads;
this.full = full;
this.skip = skip;
this.nochecksum = nochecksum;
this.exitWhenComplete = exitWhenComplete;
this.maxIdle = maxIdle;


ConnectionConfig cc = new ConnectionConfig(null, null, dest.getUsername(), dest.getPassword(),
HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl());
HarvestDestination.POSTGRESQL_DRIVER, dest.getJdbcUrl());
DataSource ds = DBUtil.getDataSource(cc);
this.initdb = new InitDatabase(ds, dest.getDatabase(), dest.getSchema());
this.initdb = new InitDatabase(ds, null, dest.getSchema());
}

@Override
Expand Down Expand Up @@ -176,31 +162,46 @@ public void run() {
while (!done) {
int ingested = 0;
for (String collection : collections) {
log.info(src.getIdentifier(collection) + " -> " + dest.getIdentifier(collection));

ObservationHarvester obsHarvester = new ObservationHarvester(src, dest, collection, basePublisherID, batchSize,
nthreads, full, nochecksum);
obsHarvester.setSkipped(skip, retryErrorMessagePattern);

DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, dest,
collection, batchSize * 100);
boolean initDel = init;
if (!init) {
// check if we have ever harvested before
HarvestState hs = obsHarvester.harvestStateDAO.get(obsHarvester.source, obsHarvester.cname);
initDel = (hs.curID == null && hs.curLastModified == null); // never harvested
}

try {
// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();

// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
} catch (TransientException e) {
ingested = 0;
log.info(src.getIdentifier(collection) + " -> " + dest);

if (validateMode) {
ObservationValidator validator = new ObservationValidator(src, collection, dest, batchSize, numThreads, false);
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, null);
try {
validator.run();
if (validator.getNumMismatches() > 0) {
obsHarvester.run(); // retry skipped
}
} catch (TransientException ex) {
log.warn("validate " + src.getIdentifier(collection) + " FAIL", ex);
}
} else {
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, retryErrorMessagePattern);

DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, collection, dest);
boolean initDel = init;
if (!init) {
// check if we have ever harvested before
HarvestState hs = obsHarvester.harvestStateDAO.get(obsHarvester.source, obsHarvester.cname);
initDel = (hs.curID == null && hs.curLastModified == null); // never harvested
}

try {
// delete observations before harvest to avoid observationURI conflicts from delete+create
obsDeleter.setInitHarvestState(initDel);
obsDeleter.run();

// harvest observations
obsHarvester.run();
ingested += obsHarvester.getIngested();
} catch (TransientException ex) {
log.warn("harvest " + src.getIdentifier(collection) + " FAIL", ex);
ingested = 0;
}
}
}

Expand All @@ -211,7 +212,7 @@ public void run() {
if (ingested > 0 || sleep == 0) {
sleep = DEFAULT_IDLE_TIME;
} else {
sleep = Math.min(sleep * 2, maxIdle * 1000L);
sleep = Math.min(sleep * 2, maxSleep * 1000L);
}
try {
log.info("idle sleep: " + (sleep / 1000L) + " sec");
Expand Down
24 changes: 10 additions & 14 deletions icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
public class DeletionHarvester extends Harvester implements Runnable {

private static final Logger log = Logger.getLogger(DeletionHarvester.class);

private static int DEFAULT_BATCH_SIZE = 100000;

private DeletedEntityDAO deletedDAO;
private RepoClient repoClient;
private ObservationDAO obsDAO;
Expand All @@ -114,9 +115,10 @@ public class DeletionHarvester extends Harvester implements Runnable {
* @param collection the collection to process
* @param batchSize ignored, always full list
*/
public DeletionHarvester(Class<?> entityClass, HarvesterResource src, HarvesterResource dest,
String collection, int batchSize) {
super(entityClass, src, dest, collection, batchSize, false);
public DeletionHarvester(Class<?> entityClass, HarvestSource src, String collection,
HarvestDestination dest) {
super(entityClass, src, collection, dest);
setBatchSize(DEFAULT_BATCH_SIZE);
init();
}

Expand All @@ -143,22 +145,21 @@ private void init() {
repoClient.setReadTimeout(120000); // 2 min

// destination
final String destDS = "jdbc/DeletionHarvester";

final String destDS = DEST_DATASOURCE_NAME;
Map<String, Object> destConfig = getConfigDAO(dest);
try {
DataSource cur = null;
try {
cur = DBUtil.findJNDIDataSource(destDS);
} catch (NamingException notInitialized) {
log.debug("JNDI not initialized yet... OK");
log.info("JNDI DataSource not initialized yet... OK");
}
if (cur == null) {
ConnectionConfig destConnectionConfig = new ConnectionConfig(null, null,
dest.getUsername(), dest.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl());
dest.getUsername(), dest.getPassword(), HarvestDestination.POSTGRESQL_DRIVER, dest.getJdbcUrl());
DBUtil.createJNDIDataSource(destDS, destConnectionConfig);
} else {
log.debug("found DataSource: " + destDS + " -- re-using");
log.info("found DataSource: " + destDS + " -- re-using");
}
} catch (NamingException e) {
throw new IllegalStateException(String.format("Error creating destination JNDI datasource for %s reason: %s",
Expand Down Expand Up @@ -209,7 +210,6 @@ public void run() {
log.error("batched aborted");
}
go = (!num.abort && !num.done);
full = false; // do not start at min(lastModified) again
}
try {
close();
Expand Down Expand Up @@ -263,10 +263,6 @@ private Progress doit() {

startDate = state.curLastModified;
if (firstIteration) {
if (super.minDate != null) { // override state
startDate = super.minDate;
}
endDate = super.maxDate;
// harvest up to a little in the past because the head of the
// sequence may be volatile
long fiveMinAgo = System.currentTimeMillis() - 5 * 60000L;
Expand Down
Loading
Loading