Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix so DataNodeSizeWorker iterator query so it uses an index #577

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cadc-inventory-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '1.0.1'
version = '1.0.2'

description = 'OpenCADC Storage Inventory database library'
def git_url = 'https://github.com/opencadc/storage-inventory'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ public void testArtifactIterator() {
log.info("count vs Namespace incremental from start...");
DateFormat df = DateUtil.getDateFormat(DateUtil.IVOA_DATE_FORMAT, DateUtil.UTC);
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, startDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, startDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand All @@ -1118,7 +1118,7 @@ public void testArtifactIterator() {

log.info("count vs Namespace incremental from mid...");
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, midDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, midDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand All @@ -1130,7 +1130,7 @@ public void testArtifactIterator() {

log.info("count vs Namespace incremental from end...");
count = 0;
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, endDate, true)) {
try (ResourceIterator<Artifact> iter = originDAO.iterator(ns, null, endDate, true, null)) {
while (iter.hasNext()) {
Artifact actual = iter.next();
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.opencadc.inventory.Artifact;
import org.opencadc.vospace.db.DataNodeSizeWorker;
import org.opencadc.inventory.Namespace;
import org.opencadc.inventory.StorageLocation;
import org.opencadc.inventory.db.ArtifactDAO;
import org.opencadc.inventory.db.HarvestState;
import org.opencadc.inventory.db.HarvestStateDAO;
Expand All @@ -106,7 +107,7 @@ public class DataNodeSizeWorkerTest {

static {
Log4jInit.setLevel("org.opencadc.inventory", Level.INFO);
Log4jInit.setLevel("org.opencadc.inventory.db", Level.INFO);
Log4jInit.setLevel("org.opencadc.inventory.db", Level.DEBUG);
Log4jInit.setLevel("ca.nrc.cadc.db", Level.INFO);
Log4jInit.setLevel("org.opencadc.vospace", Level.INFO);
Log4jInit.setLevel("org.opencadc.vospace.db", Level.INFO);
Expand Down Expand Up @@ -180,7 +181,16 @@ public void init_cleanup() throws Exception {
}

@Test
public void testSyncArtifact() throws Exception {
public void testSyncArtifactSite() throws Exception {
testSyncArtifact(true);
}

@Test
public void testSyncArtifactGlobal() throws Exception {
testSyncArtifact(false);
}

private void testSyncArtifact(boolean isStorageSite) throws Exception {
UUID rootID = new UUID(0L, 0L);
ContainerNode root = new ContainerNode(rootID, "root");

Expand All @@ -206,11 +216,15 @@ public void testSyncArtifact() throws Exception {
URI.create("md5:d41d8cd98f00b204e9800998ecf8427e"),
new Date(),
666L);
if (isStorageSite) {
expected.storageLocation = new StorageLocation(URI.create("id:" + UUID.randomUUID().toString()));
expected.storageLocation.storageBucket = "X";
}
log.info("expected: " + expected);

artifactDAO.put(expected);
Artifact actualArtifact = artifactDAO.get(expected.getID());
Assert.assertNotNull(actual);
Assert.assertNotNull(actualArtifact);
Assert.assertEquals(expected.getContentLength(), actualArtifact.getContentLength());

String hsName = "ArtifactSize";
Expand All @@ -219,25 +233,36 @@ public void testSyncArtifact() throws Exception {
harvestStateDAO.put(hs);
hs = harvestStateDAO.get(hsName, resourceID);

DataNodeSizeWorker asWorker = new DataNodeSizeWorker(harvestStateDAO, hs, artifactDAO, siNamespace);
DataNodeSizeWorker asWorker = new DataNodeSizeWorker(harvestStateDAO, hs, artifactDAO, siNamespace, isStorageSite);
log.info("*** DataNodeSizeWorker START");
asWorker.run();
log.info("*** DataNodeSizeWorker DONE");

actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertNotNull(actual);
log.info("found: " + actual.getID() + " aka " + actual);
Assert.assertEquals(expected.getContentLength(), actual.bytesUsed);

Thread.sleep(100L);

// update the artifact only
artifactDAO.delete(actualArtifact.getID());
expected = new Artifact(expected.getURI(), expected.getMetaChecksum(), new Date(), 333L);
artifactDAO.put(expected);
Artifact modified = new Artifact(expected.getURI(), expected.getMetaChecksum(), new Date(), 333L);
if (isStorageSite) {
modified.storageLocation = new StorageLocation(URI.create("id:" + UUID.randomUUID().toString()));
modified.storageLocation.storageBucket = "X";
}
artifactDAO.put(modified);
actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertNotEquals(expected.getContentLength(), actual.bytesUsed);
Assert.assertNotEquals(modified.getContentLength(), actual.bytesUsed);

// run the update
log.info("*** DataNodeSizeWorker START");
asWorker.run();
log.info("*** DataNodeSizeWorker DONE");

actual = (DataNode)nodeDAO.get(orig.getID());
Assert.assertEquals(expected.getContentLength(), actual.bytesUsed);
Assert.assertEquals(modified.getContentLength(), actual.bytesUsed);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public ResourceIterator<Artifact> storedIterator(String storageBucketPrefix) {
SQLGenerator.ArtifactIteratorQuery iter = (SQLGenerator.ArtifactIteratorQuery) gen.getEntityIteratorQuery(Artifact.class);
iter.setStorageLocationRequired(true);
iter.setStorageBucket(storageBucketPrefix);
iter.setOrderedOutput(true);
iter.setOrderByStorageLocation(true);
return iter.query(dataSource);
} catch (BadSqlGrammarException ex) {
handleInternalFail(ex);
Expand Down Expand Up @@ -250,31 +250,33 @@ public ResourceIterator<Artifact> iterator(String uriBucketPrefix, boolean order
* conditions on fields of the Artifact using the field names for column references.
* Example: <code>uri like 'ad:bar/%'</code>. The result is currently not ordered.
*
* <p>Use case: local cleanup by arbitrary criteria
* <p>Use case: local cleanup by ringhold
*
* @param ns namespace for selecting artifacts
* @param uriBucketPrefix null, prefix, or complete Artifact.uriBucket string
* @param ordered order by Artifact.uri (true) or not ordered (false)
* @return iterator over artifacts matching criteria
*/
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix, boolean ordered) {
return iterator(ns, uriBucketPrefix, null, ordered);
return iterator(ns, uriBucketPrefix, null, ordered, null);
}

/**
* Iterate over artifacts that match criteria. This method adds an optional Date argument to
* Iterate over artifacts in a specific namespace.This method adds an optional Date argument to
* support incremental processing. In this case, ordered will be in timestamp order rather than
* uri order.
*
* <p>Use case: process artifact events directly in the database
* <p>Use case: process artifact events directly in the database (DataNodeSizeWorker)
*
* @param ns namespace for selecting artifacts
* @param uriBucketPrefix null, prefix, or complete Artifact.uriBucket string
* @param minLastModified minimum Artifact.lastModified to consider (incremental mode)
* @param ordered order by Artifact.uri (true) or not ordered (false)
* @return iterator over artifacts matching criteria
* @param minLastModified minimum Artifact.lastModified to consider (incremental mode), null for all
* @param ordered order by Artifact.lastModified (true) or not ordered (false)
* @param isStored true to select stored artifacts, false for unstored, null for all
* @return iterator over artifacts
*/
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix, Date minLastModified, boolean ordered) {
public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix,
Date minLastModified, boolean ordered, Boolean isStored) {
checkInit();
long t = System.currentTimeMillis();

Expand All @@ -284,6 +286,7 @@ public ResourceIterator<Artifact> iterator(Namespace ns, String uriBucketPrefix,
iter.setNamespace(ns);
iter.setMinLastModified(minLastModified);
iter.setOrderedOutput(ordered);
iter.setStorageLocationRequired(isStored);
return iter.query(dataSource);
} catch (BadSqlGrammarException ex) {
handleInternalFail(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ class ArtifactIteratorQuery implements EntityIteratorQuery<Artifact> {
private Namespace namespace;
private Date minLastModified;
private boolean ordered;
private boolean orderByStorageLocation;

private final Calendar utc = Calendar.getInstance(DateUtil.UTC);

Expand Down Expand Up @@ -730,6 +731,10 @@ public void setOrderedOutput(boolean ordered) {
this.ordered = ordered;
}

public void setOrderByStorageLocation(boolean obsl) {
this.orderByStorageLocation = obsl;
}

public void setSiteID(UUID siteID) {
this.siteID = siteID;
}
Expand All @@ -752,7 +757,7 @@ public ResourceIterator<Artifact> query(DataSource ds) {
// null storageBucket to come after non-null
// postgresql: the default order is equivalent to explicitly specifying ASC NULLS LAST
// default behaviour may not be db-agnostic
if (ordered) {
if (orderByStorageLocation) {
sb.append(" ORDER BY storageLocation_storageBucket, storageLocation_storageID");
}
} else if (storageLocationRequired != null && !storageLocationRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class DataNodeSizeWorker implements Runnable {
private final ArtifactDAO artifactDAO;
private final HarvestStateDAO harvestStateDAO;
private final Namespace storageNamespace;
private boolean isStorageSite;

private long numArtifactsProcessed;

Expand All @@ -111,12 +112,13 @@ public class DataNodeSizeWorker implements Runnable {
* @param namespace artifact namespace
*/
public DataNodeSizeWorker(HarvestStateDAO harvestStateDAO, HarvestState harvestState,
ArtifactDAO artifactDAO, Namespace namespace) {
ArtifactDAO artifactDAO, Namespace namespace, boolean isStorageSite) {
this.harvestState = harvestState;
this.harvestStateDAO = harvestStateDAO;
this.nodeDAO = new NodeDAO(harvestStateDAO);
this.artifactDAO = artifactDAO;
this.storageNamespace = namespace;
this.isStorageSite = isStorageSite;
}

public long getNumArtifactsProcessed() {
Expand Down Expand Up @@ -147,7 +149,7 @@ public void run() {
}

String uriBucket = null; // process all artifacts in a single thread
try (final ResourceIterator<Artifact> iter = artifactDAO.iterator(storageNamespace, uriBucket, startTime, true)) {
try (final ResourceIterator<Artifact> iter = artifactDAO.iterator(storageNamespace, uriBucket, startTime, true, isStorageSite)) {
TransactionManager tm = nodeDAO.getTransactionManager();
while (iter.hasNext()) {
Artifact artifact = iter.next();
Expand Down
19 changes: 14 additions & 5 deletions vault/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ specification designed to co-exist with other storage-inventory components. It p
organization layer on top of the storage management of storage-inventory.

The simplest configuration would be to deploy `vault` with `minoc` with a single metadata database and single
back end storage system. Details: TBD.
back end storage system. This is _isStoragerSite = true_ below.

The other option would be to deploy `vault` with `raven` and `luskan` in a global inventory database and make
use of one or more of the network of known storage sites to store files. Details: TBD.
use of one or more of the network of known storage sites to store files. This is _isStoragerSite = false_ below.
Details: TBD.

## deployment

Expand Down Expand Up @@ -78,6 +79,9 @@ A vault.properties file in /config is required to run this service. The followi
# service identity
org.opencadc.vault.resourceID = ivo://{authority}/{name}

# associated inventory type
org.opencadc.vault.inventory.isStorageSite = true|false

# consistency settings
org.opencadc.vault.consistency.preventNotFound=true|false

Expand All @@ -97,11 +101,16 @@ org.opencadc.vault.storage.namespace = {a storage inventory namespace to use}
```
The vault _resourceID_ is the resourceID of _this_ vault service.

the _isStorageSite_ key tells `vault` whether the associated inventory database is a storage
site or a global database. This effects the behaviour and performance of the background thread that
syncs Artifact.contentLength values to the DataNode.bytesUsed field.

The _preventNotFound_ key can be used to configure `vault` to prevent artifact-not-found errors that might
result due to the eventual consistency nature of the storage system by directly checking for the artifact at
_all known_ sites. It only makes sense to enable this when `vault` is running in a global inventory (along with
`raven` and/or `fenwick` instances syncing artifact metadata. This feature introduces an overhead for the
genuine not-found cases: transfer negotiation to GET the file that was never PUT.
_all known_ sites. It makes sense to enable this when `vault` is running in a global inventory (along with
`raven` and/or `fenwick` instances syncing artifact metadata, but even in a single _storage site_ deployment
this also helps hide some temporary inconsistencies. This feature introduces an overhead for the
genuine not-found cases: trying to GET the file that was never successfuly PUT but the DataNode was created.

The _allocationParent_ is a path to a container node (directory) which contains space allocations. An allocation
is owned by a user (usually different from the _rootOwner_ admin user) who is responsible for the allocation
Expand Down
14 changes: 13 additions & 1 deletion vault/src/main/java/org/opencadc/vault/VaultInitAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class VaultInitAction extends InitAction {
private static final String VAULT_KEY = "org.opencadc.vault";
static final String RESOURCE_ID_KEY = VAULT_KEY + ".resourceID";
static final String PREVENT_NOT_FOUND_KEY = VAULT_KEY + ".consistency.preventNotFound";
static final String IS_STORAGE_SITE_KEY = VAULT_KEY + ".inventory.isStorageSite";
static final String INVENTORY_SCHEMA_KEY = VAULT_KEY + ".inventory.schema";
static final String VOSPACE_SCHEMA_KEY = VAULT_KEY + ".vospace.schema";
static final String SINGLE_POOL_KEY = VAULT_KEY + ".singlePool";
Expand Down Expand Up @@ -201,6 +202,15 @@ static MultiValuedProperties getConfig() {
sb.append("OK");
}

String iss = mvp.getFirstPropertyValue(IS_STORAGE_SITE_KEY);
sb.append("\n\t" + IS_STORAGE_SITE_KEY + ": ");
if (iss == null) {
sb.append("MISSING");
ok = false;
} else {
sb.append("OK");
}

String pnf = mvp.getFirstPropertyValue(PREVENT_NOT_FOUND_KEY);
sb.append("\n\t" + PREVENT_NOT_FOUND_KEY + ": ");
if (pnf == null) {
Expand Down Expand Up @@ -479,8 +489,10 @@ private void initBackgroundWorkers() {
offline = true;
}

boolean isStorageSite = Boolean.parseBoolean(props.getFirstPropertyValue(IS_STORAGE_SITE_KEY));

terminateBackgroundWorkers();
DataNodeSizeSync async = new DataNodeSizeSync(hsDAO, artifactDAO, storageNamespace);
DataNodeSizeSync async = new DataNodeSizeSync(hsDAO, artifactDAO, storageNamespace, isStorageSite);
async.setOffline(offline);
this.dataNodeSizeSyncThread = new Thread(async);
dataNodeSizeSyncThread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ public class DataNodeSizeSync implements Runnable {
private final HarvestStateDAO stateDAO;
private final ArtifactDAO artifactDAO;
private final Namespace artifactNamespace;
private final boolean isStorageSite;

private boolean offline = false;

public DataNodeSizeSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace artifactNamespace) {
public DataNodeSizeSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace artifactNamespace, boolean isStorageSite) {
this.stateDAO = stateDAO;
this.artifactDAO = artifactDAO;
this.artifactNamespace = artifactNamespace;
this.isStorageSite = isStorageSite;

// we need continuous timestamp updates to retain leader status, so only schedule maintenance
stateDAO.setUpdateBufferCount(0);
Expand Down Expand Up @@ -153,7 +155,7 @@ public void run() {
boolean fail = false;
log.info(logInfo.start());
try {
DataNodeSizeWorker worker = new DataNodeSizeWorker(stateDAO, state, artifactDAO, artifactNamespace);
DataNodeSizeWorker worker = new DataNodeSizeWorker(stateDAO, state, artifactDAO, artifactNamespace, isStorageSite);
worker.run();
logInfo.setLastModified(state.curLastModified);
logInfo.processed = worker.getNumArtifactsProcessed();
Expand Down
Loading