Skip to content

Commit

Permalink
Fix some error handling in the MetaSync utility where it could
Browse files Browse the repository at this point in the history
hang forever. This still needs some rework and UTs. Ugly.

Signed-off-by: Chris Larsen <[email protected]>
  • Loading branch information
manolama committed Feb 1, 2016
1 parent 3cd59be commit 62cc3b7
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 116 deletions.
251 changes: 136 additions & 115 deletions src/tools/MetaSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,28 @@ public MetaSync(final TSDB tsdb, final long start_id, final double quotient,
* Loops through the entire TSDB data set and exits when complete.
*/
public void run() {

// list of deferred calls used to act as a buffer
final ArrayList<Deferred<Boolean>> storage_calls =
new ArrayList<Deferred<Boolean>>();
final Deferred<Object> result = new Deferred<Object>();

final class ErrBack implements Callback<Object, Exception> {
@Override
public Object call(Exception e) throws Exception {
Throwable ex = e;
while (ex.getClass().equals(DeferredGroupException.class)) {
if (ex.getCause() == null) {
LOG.warn("Unable to get to the root cause of the DGE");
break;
}
ex = ex.getCause();
}
LOG.error("Sync thread failed with exception", ex);
result.callback(null);
return null;
}
}
final ErrBack err_back = new ErrBack();

/**
* Called when we have encountered a previously un-processed UIDMeta object.
Expand Down Expand Up @@ -360,7 +377,7 @@ public MetaScanner() {
* been processed.
*/
public Object scan() {
return scanner.nextRows().addCallback(this);
return scanner.nextRows().addCallback(this).addErrback(err_back);
}

@Override
Expand All @@ -372,132 +389,136 @@ public Object call(ArrayList<ArrayList<KeyValue>> rows)
}

for (final ArrayList<KeyValue> row : rows) {

final byte[] tsuid = UniqueId.getTSUIDFromKey(row.get(0).key(),
TSDB.metrics_width(), Const.TIMESTAMP_BYTES);

// if the current tsuid is the same as the last, just continue
// so we save time
if (last_tsuid != null && Arrays.equals(last_tsuid, tsuid)) {
continue;
}
last_tsuid = tsuid;

// see if we've already processed this tsuid and if so, continue
if (processed_tsuids.contains(Arrays.hashCode(tsuid))) {
continue;
}
tsuid_string = UniqueId.uidToString(tsuid);

// add tsuid to the processed list
processed_tsuids.add(Arrays.hashCode(tsuid));

// we may have a new TSUID or UIDs, so fetch the timestamp of the
// row for use as the "created" time. Depending on speed we could
// parse datapoints, but for now the hourly row time is enough
final long timestamp = Bytes.getUnsignedInt(row.get(0).key(),
TSDB.metrics_width());

LOG.debug("[" + thread_id + "] Processing TSUID: " + tsuid_string +
" row timestamp: " + timestamp);

// now process the UID metric meta data
final byte[] metric_uid_bytes =
Arrays.copyOfRange(tsuid, 0, TSDB.metrics_width());
final String metric_uid = UniqueId.uidToString(metric_uid_bytes);
Long last_get = metric_uids.get(metric_uid);

if (last_get == null || last_get == 0 || timestamp < last_get) {
// fetch and update. Returns default object if the meta doesn't
// exist, so we can just call sync on this to create a missing
// entry
final UidCB cb = new UidCB(UniqueIdType.METRIC,
metric_uid_bytes, timestamp);
final Deferred<Boolean> process_uid = UIDMeta.getUIDMeta(tsdb,
UniqueIdType.METRIC, metric_uid_bytes).addCallbackDeferring(cb);
storage_calls.add(process_uid);
metric_uids.put(metric_uid, timestamp);
}

// loop through the tags and process their meta
final List<byte[]> tags = UniqueId.getTagsFromTSUID(tsuid_string);
int idx = 0;
for (byte[] tag : tags) {
final UniqueIdType type = (idx % 2 == 0) ? UniqueIdType.TAGK :
UniqueIdType.TAGV;
idx++;
final String uid = UniqueId.uidToString(tag);
try {
final byte[] tsuid = UniqueId.getTSUIDFromKey(row.get(0).key(),
TSDB.metrics_width(), Const.TIMESTAMP_BYTES);

// check the maps to see if we need to bother updating
if (type == UniqueIdType.TAGK) {
last_get = tagk_uids.get(uid);
} else {
last_get = tagv_uids.get(uid);
// if the current tsuid is the same as the last, just continue
// so we save time
if (last_tsuid != null && Arrays.equals(last_tsuid, tsuid)) {
continue;
}
if (last_get != null && last_get != 0 && last_get <= timestamp) {
last_tsuid = tsuid;

// see if we've already processed this tsuid and if so, continue
if (processed_tsuids.contains(Arrays.hashCode(tsuid))) {
continue;
}

// fetch and update. Returns default object if the meta doesn't
// exist, so we can just call sync on this to create a missing
// entry
final UidCB cb = new UidCB(type, tag, timestamp);
final Deferred<Boolean> process_uid = UIDMeta.getUIDMeta(tsdb, type, tag)
.addCallbackDeferring(cb);
storage_calls.add(process_uid);
if (type == UniqueIdType.TAGK) {
tagk_uids.put(uid, timestamp);
} else {
tagv_uids.put(uid, timestamp);
tsuid_string = UniqueId.uidToString(tsuid);

/**
* An error callback used to catch issues with a particular timeseries
* or UIDMeta such as a missing UID name. We want to continue
* processing when this happens so we'll just log the error and
* the user can issue a command later to clean up orphaned meta
* entries.
*/
final class RowErrBack implements Callback<Object, Exception> {
@Override
public Object call(Exception e) throws Exception {
Throwable ex = e;
while (ex.getClass().equals(DeferredGroupException.class)) {
if (ex.getCause() == null) {
LOG.warn("Unable to get to the root cause of the DGE");
break;
}
ex = ex.getCause();
}
if (ex.getClass().equals(IllegalStateException.class)) {
LOG.error("Invalid data when processing TSUID [" +
tsuid_string + "]: " + ex.getMessage());
} else if (ex.getClass().equals(IllegalArgumentException.class)) {
LOG.error("Invalid data when processing TSUID [" +
tsuid_string + "]: " + ex.getMessage());
} else if (ex.getClass().equals(NoSuchUniqueId.class)) {
LOG.warn("Timeseries [" + tsuid_string +
"] includes a non-existant UID: " + ex.getMessage());
} else {
LOG.error("Unknown exception processing row: " + row, ex);
}
return null;
}
}
}

/**
* An error callback used to cache issues with a particular timeseries
* or UIDMeta such as a missing UID name. We want to continue
* processing when this happens so we'll just log the error and
* the user can issue a command later to clean up orphaned meta
* entries.
*/
final class ErrBack implements Callback<Deferred<Boolean>, Exception> {

@Override
public Deferred<Boolean> call(Exception e) throws Exception {
// add tsuid to the processed list
processed_tsuids.add(Arrays.hashCode(tsuid));

// we may have a new TSUID or UIDs, so fetch the timestamp of the
// row for use as the "created" time. Depending on speed we could
// parse datapoints, but for now the hourly row time is enough
final long timestamp = Bytes.getUnsignedInt(row.get(0).key(),
TSDB.metrics_width());

LOG.debug("[" + thread_id + "] Processing TSUID: " + tsuid_string +
" row timestamp: " + timestamp);

// now process the UID metric meta data
final byte[] metric_uid_bytes =
Arrays.copyOfRange(tsuid, 0, TSDB.metrics_width());
final String metric_uid = UniqueId.uidToString(metric_uid_bytes);
Long last_get = metric_uids.get(metric_uid);

if (last_get == null || last_get == 0 || timestamp < last_get) {
// fetch and update. Returns default object if the meta doesn't
// exist, so we can just call sync on this to create a missing
// entry
final UidCB cb = new UidCB(UniqueIdType.METRIC,
metric_uid_bytes, timestamp);
final Deferred<Boolean> process_uid = UIDMeta.getUIDMeta(tsdb,
UniqueIdType.METRIC, metric_uid_bytes)
.addCallbackDeferring(cb)
.addErrback(new RowErrBack());
storage_calls.add(process_uid);
metric_uids.put(metric_uid, timestamp);
}

// loop through the tags and process their meta
final List<byte[]> tags = UniqueId.getTagsFromTSUID(tsuid_string);
int idx = 0;
for (byte[] tag : tags) {
final UniqueIdType type = (idx % 2 == 0) ? UniqueIdType.TAGK :
UniqueIdType.TAGV;
idx++;
final String uid = UniqueId.uidToString(tag);

Throwable ex = e;
while (ex.getClass().equals(DeferredGroupException.class)) {
if (ex.getCause() == null) {
LOG.warn("Unable to get to the root cause of the DGE");
break;
}
ex = ex.getCause();
// check the maps to see if we need to bother updating
if (type == UniqueIdType.TAGK) {
last_get = tagk_uids.get(uid);
} else {
last_get = tagv_uids.get(uid);
}
if (last_get != null && last_get != 0 && last_get <= timestamp) {
continue;
}
if (ex.getClass().equals(IllegalStateException.class)) {
LOG.error("Invalid data when processing TSUID [" +
tsuid_string + "]", ex);
} else if (ex.getClass().equals(IllegalArgumentException.class)) {
LOG.error("Invalid data when processing TSUID [" +
tsuid_string + "]", ex);
} else if (ex.getClass().equals(NoSuchUniqueId.class)) {
LOG.warn("Timeseries [" + tsuid_string +
"] includes a non-existant UID: " + ex.getMessage());

// fetch and update. Returns default object if the meta doesn't
// exist, so we can just call sync on this to create a missing
// entry
final UidCB cb = new UidCB(type, tag, timestamp);
final Deferred<Boolean> process_uid =
UIDMeta.getUIDMeta(tsdb, type, tag)
.addCallbackDeferring(cb)
.addErrback(new RowErrBack());
storage_calls.add(process_uid);
if (type == UniqueIdType.TAGK) {
tagk_uids.put(uid, timestamp);
} else {
LOG.error("Unmatched Exception: " + ex.getClass());
throw e;
tagv_uids.put(uid, timestamp);
}

return Deferred.fromResult(false);
}

// handle the timeseries meta last so we don't record it if one
// or more of the UIDs had an issue
final Deferred<Boolean> process_tsmeta =
TSMeta.getTSMeta(tsdb, tsuid_string)
.addCallbackDeferring(new TSMetaCB(tsuid, timestamp))
.addErrback(new RowErrBack());
storage_calls.add(process_tsmeta);
} catch (RuntimeException e) {
LOG.error("Processing row " + row + " failed with exception: "
+ e.getMessage());
LOG.debug("Row: " + row + " stack trace: ", e);
}

// handle the timeseries meta last so we don't record it if one
// or more of the UIDs had an issue
final Deferred<Boolean> process_tsmeta =
TSMeta.getTSMeta(tsdb, tsuid_string)
.addCallbackDeferring(new TSMetaCB(tsuid, timestamp));
process_tsmeta.addErrback(new ErrBack());
storage_calls.add(process_tsmeta);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/tools/UidManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static void main(String[] args) throws Exception {
rc = runCommand(tsdb, table, idwidth, ignorecase, args);
} finally {
try {
LOG.info("Shutting down TSD....");
tsdb.getClient().shutdown().joinUninterruptibly();
LOG.info("Gracefully shutdown the TSD");
} catch (Exception e) {
Expand Down Expand Up @@ -990,7 +991,7 @@ private static int metaSync(final TSDB tsdb) throws Exception {
threads[i].join();
LOG.info("[" + i + "] Finished");
}

LOG.info("All metasync threads have completed");
// make sure buffered data is flushed to storage before exiting
tsdb.flush().joinUninterruptibly();

Expand Down

0 comments on commit 62cc3b7

Please sign in to comment.