diff --git a/.classpath b/.classpath index e79d94c..83bf4ad 100644 --- a/.classpath +++ b/.classpath @@ -4,7 +4,7 @@ - + diff --git a/src/plugins/Spider/LibraryBuffer.java b/src/plugins/Spider/LibraryBuffer.java index 44d34eb..5e9b552 100644 --- a/src/plugins/Spider/LibraryBuffer.java +++ b/src/plugins/Spider/LibraryBuffer.java @@ -97,7 +97,7 @@ public synchronized int bufferUsageEstimate() { public void start() { // Do in a transaction so it gets committed separately. spider.db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); - spider.resetPages(Status.NOT_PUSHED, Status.QUEUED); + spider.resetPages(Status.NOT_PUSHED, Status.NEW); spider.db.endThreadTransaction(); } @@ -161,13 +161,12 @@ private void sendBuffer(int bufferUsageEstimated) { long tStart = System.currentTimeMillis(); try { Logger.normal(this, "Sending buffer of estimated size " + bufferUsageEstimated + " bytes to Library"); - long totalPagesIndexed = spider.getRoot().getPageCount(Status.INDEXED); Bucket bucket = pr.getNode().clientCore.tempBucketFactory.makeBucket(3000000); - writeToPush(totalPagesIndexed, bucket); + writeToPush(totalPagesIndexed(), bucket); innerSend(bucket); Logger.normal(this, "Buffer successfully sent to Library, size = "+bucket.size()); // Not a separate transaction, commit with the index updates. - spider.resetPages(Status.NOT_PUSHED, Status.INDEXED); + spider.donePages(); } catch (IOException ex) { Logger.error(this, "Could not make bucket to transfer buffer", ex); } @@ -186,6 +185,12 @@ private void sendBuffer(int bufferUsageEstimated) { System.out.println("Restored data from last time from "+SAVE_FILE); } } + + private long totalPagesIndexed() { + return spider.getRoot().getPageCount(Status.DONE) + + spider.getRoot().getPageCount(Status.PROCESSED_KSK) + + spider.getRoot().getPageCount(Status.PROCESSED_USK); + } private synchronized Bucket writeToPush(long totalPagesIndexed, Bucket bucket) throws IOException { OutputStream os = bucket.getOutputStream(); @@ -271,7 +276,7 @@ public void terminate() { FileBucket bucket = new FileBucket(SAVE_FILE, false, false, false, false); long totalPagesIndexed; try { - totalPagesIndexed = spider.getRoot().getPageCount(Status.INDEXED); + totalPagesIndexed = totalPagesIndexed(); } catch (Throwable t) { totalPagesIndexed = -1; // FIXME I don't understand why this (ClassNotFoundException) happens, we have not closed the class loader yet. diff --git a/src/plugins/Spider/Spider.java b/src/plugins/Spider/Spider.java index 3937914..d48bb4f 100644 --- a/src/plugins/Spider/Spider.java +++ b/src/plugins/Spider/Spider.java @@ -15,18 +15,21 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.Date; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import plugins.Spider.index.TermPageEntry; @@ -63,6 +66,7 @@ import freenet.pluginmanager.FredPluginVersioned; import freenet.pluginmanager.PluginRespirator; import freenet.support.Logger; +import freenet.support.Logger.LogLevel; import freenet.support.api.Bucket; import freenet.support.io.Closer; import freenet.support.io.NativeThread; @@ -78,17 +82,24 @@ * */ public class Spider implements FredPlugin, FredPluginThreadless, - FredPluginVersioned, FredPluginRealVersioned, FredPluginL10n, USKCallback, RequestClient { + FredPluginVersioned, FredPluginRealVersioned, FredPluginL10n, RequestClient { /** Document ID of fetching documents */ - protected Map runningFetch = Collections.synchronizedMap(new HashMap()); + protected Map> runningFetches = new HashMap>(); + { + for (Status status : Config.statusesToProcess) { + runningFetches.put(status, Collections.synchronizedMap(new HashMap())); + } + } + + private Map> runningFutures = Collections.synchronizedMap(new HashMap>()); /** * Lists the allowed mime types of the fetched page. */ protected Set allowedMIMETypes; - static int dbVersion = 45; + static int dbVersion = 48; static int version = 53; /** We use the standard http://127.0.0.1:8888/ for parsing HTML regardless of what the local @@ -117,6 +128,11 @@ public long getRealVersion() { private LibraryBuffer librarybuffer; private final AtomicLong lastRequestFinishedAt = new AtomicLong(); + private final AtomicInteger editionsFound = new AtomicInteger(); + + private final Set subscribedToUSKs = Collections.synchronizedSet(new HashSet()); + + private Map bulkPageIterators = null; public int getLibraryBufferSize() { return librarybuffer.bufferUsageEstimate(); @@ -134,6 +150,14 @@ public long getLastRequestFinishedAt() { return lastRequestFinishedAt.get(); } + public int getSubscribedToUSKs() { + return subscribedToUSKs.size(); + } + + public int getEditionsFound() { + return editionsFound.get(); + } + public Config getConfig() { return getRoot().getConfig(); } @@ -149,15 +173,24 @@ public void setConfig(Config config) { } /** - * Adds the found uri to the list of to-be-retrieved uris.

Every usk uri added as ssk. + * Adds the found uri to the list of to-be-retrieved uris.

+ * + * SSKs are added as their corresponding USK. + * + * Uris already in the database are not added. New Uris are put in NEW. + * + * USKs in the database but with new edition are moved to NEW_EDITION. + * * @param uri the new uri that needs to be fetched for further indexing */ - public void queueURI(FreenetURI uri, String comment, boolean force) { + public void queueURI(FreenetURI uri, String comment) { String sURI = uri.toString(); + final long NO_USK = -1L; + long edition = NO_USK; String lowerCaseURI = sURI.toLowerCase(Locale.US); for (String ext : getRoot().getConfig().getBadlistedExtensions()) { if (lowerCaseURI.endsWith(ext)) { - return; // be smart + return; // be smart, don't fetch certain files } } @@ -167,26 +200,35 @@ public void queueURI(FreenetURI uri, String comment, boolean force) { } } + // Always add an USK page without the '-' to trigger search of versions. + if (uri.isUSK() && uri.getSuggestedEdition() < 0) { + uri = uri.setSuggestedEdition(-uri.getSuggestedEdition()); + } + + // Never add an SSK if there could be a corresponding USK + if (uri.isSSKForUSK()) { + uri = uri.uskForSSK(); + } + if (uri.isUSK()) { - if (uri.getSuggestedEdition() < 0) { - uri = uri.setSuggestedEdition((-1) * uri.getSuggestedEdition()); - } - try { - uri = ((USK.create(uri)).getSSK()).getURI(); - (clientContext.uskManager).subscribe(USK.create(uri), this, false, this); - } catch (Exception e) { - } + edition = uri.getSuggestedEdition(); + uri = uri.setSuggestedEdition(0L); } db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); boolean dbTransactionEnded = false; try { Page page = getRoot().getPageByURI(uri, true, comment); - if (force && page.getStatus() != Status.QUEUED) { - page.setStatus(Status.QUEUED); - page.setComment(comment); + if (edition != NO_USK) { + final long oldEdition = page.getEdition(); + if (edition > oldEdition) { + Status whereTo = Status.NEW_EDITION; + if (!page.hasBeenFetched()) { + whereTo = Status.NEW; + } + page.setStatus(edition, whereTo, "New edition replacing " + oldEdition); + } } - db.endThreadTransaction(); dbTransactionEnded = true; } catch (RuntimeException e) { @@ -200,56 +242,234 @@ public void queueURI(FreenetURI uri, String comment, boolean force) { } } + private class SubscribedToUSK implements USKCallback { + private static final int DELAY_IN_MINUTES_AFTER_NEW_EDITION_SEEN = 10; + private FreenetURI uri; + USK usk; + + SubscribedToUSK(FreenetURI theURI) { + uri = theURI; + try { + usk = USK.create(uri); + } catch (MalformedURLException e) { + Logger.error(this, "Cannot subscribe to " + uri + ".", e); + return; + } + (clientContext.uskManager).subscribe(usk, this, false, Spider.this); + } + + @Override + public void onFoundEdition(long l, final USK key, ClientContext context, boolean metadata, + short codec, byte[] data, boolean newKnownGood, boolean newSlot) { + Logger.minor(this, "Found new Edition for " + key + ", newKnownGood=" + newKnownGood + " newSlot=" + newSlot + "."); + editionsFound.getAndIncrement(); + final FreenetURI uri = key.getURI(); + + callbackExecutor.schedule(new Runnable() { + @Override + public void run() { + Logger.debug(this, "Queueing new Edition for " + key + "."); + queueURI(uri, "USK found edition " + uri); + } + }, DELAY_IN_MINUTES_AFTER_NEW_EDITION_SEEN, TimeUnit.MINUTES); + } + + public void unsubscribe() { + (clientContext.uskManager).unsubscribe(usk, this); + subscribedToUSKs.remove(this); + } + + @Override + public short getPollingPriorityNormal() { + return (short) Math.min(RequestStarter.MINIMUM_FETCHABLE_PRIORITY_CLASS, getRoot().getConfig().getRequestPriority() + 1); + } + + @Override + public short getPollingPriorityProgress() { + return getRoot().getConfig().getRequestPriority(); + } + } + + private void subscribeUSK(FreenetURI uri) { + subscribedToUSKs.add(new SubscribedToUSK(uri)); + } + /** - * Start requests from the queue if less than 80% of the max requests are running until the max requests are running. + * Fetches pages from a queue, many at the time to avoid locking + * the database on every fetch. */ - public void startSomeRequests() { - ArrayList toStart = null; + class BulkPageIterator implements Iterator { + private Status queue; + private Deque list = new LinkedList(); + private int BULK_FETCH_SIZE = 100; + private long TIME_TO_DEFER_DATABASE_READ = TimeUnit.SECONDS.toMillis(10); + private Date lastPoll = new Date(); + + BulkPageIterator(Status status) { + queue = status; + } + + /** + * Fills the cache with pages. + * If the consumer went through the cache to quickly, don't + * fill it, emulating an empty iterator. This addresses the + * case when all the found pages are in progress. It also sets + * a cap on the amount of pages processed per time unit. + * @param extraFetches is amount of pages to fetch on top of + * BULK_FETCH_SIZE. + */ + private void fill(int extraFetches) { + Date now = new Date(); + if (list.isEmpty() && now.after(new Date(lastPoll.getTime() + TIME_TO_DEFER_DATABASE_READ))) { + lastPoll = now; + db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); + try { + Iterator it = getRoot().getPages(queue); + int i = 0; + while (it.hasNext()) { + list.offer(it.next()); + if (++i > BULK_FETCH_SIZE + extraFetches) { + break; + } + } + } finally { + db.endThreadTransaction(); + } + } + } + + public boolean hasNext(int extraFetches) { + fill(extraFetches); + return !list.isEmpty(); + } + + @Override + public boolean hasNext() { + fill(0); + return !list.isEmpty(); + } + + @Override + public Page next() { + return list.poll(); + } + } + + /** + * Start requests from new and queued. + */ + private void startFetches() { synchronized (this) { - if (stopped) return; + if (stopped) { + bulkPageIterators = null; + return; + } + + if (bulkPageIterators == null) { + bulkPageIterators = new HashMap(); + } + } - synchronized (runningFetch) { - int running = runningFetch.size(); - int maxParallelRequests = getRoot().getConfig().getMaxParallelRequests(); + for (Status status : Config.statusesToProcess) { + ArrayList toStart = null; + synchronized (this) { + Map runningFetch = runningFetches.get(status); + synchronized (runningFetch) { + int maxParallelRequests = getRoot().getConfig().getMaxParallelRequests(status); + int running = runningFetch.size(); - if (running >= maxParallelRequests * 0.8) return; + if (maxParallelRequests <= running) { + continue; + } - // Prepare to start - toStart = new ArrayList(maxParallelRequests - running); - db.beginThreadTransaction(Storage.COOPERATIVE_TRANSACTION); - getRoot().sharedLockPages(Status.QUEUED); - try { - Iterator it = getRoot().getPages(Status.QUEUED); + toStart = new ArrayList(maxParallelRequests - running); - while (running + toStart.size() < maxParallelRequests && it.hasNext()) { - Page page = it.next(); - // Skip if getting this page already - if (runningFetch.containsKey(page)) continue; - + if (!bulkPageIterators.containsKey(status)) { + bulkPageIterators.put(status, new BulkPageIterator(status)); + } + + BulkPageIterator bulkPageIterator = bulkPageIterators.get(status); + while (running + toStart.size() < maxParallelRequests && + bulkPageIterator.hasNext(maxParallelRequests)) { + Page page = bulkPageIterator.next(); + Logger.debug(this, "Page " + page + " found in " + status + "."); try { - ClientGetter getter = makeGetter(page); + FreenetURI uri = new FreenetURI(page.getURI()); + // Skip if getting this page already + if (runningFetch.containsKey(uri)) continue; + + if (uri.isUSK()) { + uri = uri.setSuggestedEdition(page.getEdition()); + } + ClientGetter getter = makeGetter(uri); - Logger.minor(this, "Starting " + getter + " " + page); + Logger.minor(this, "Starting new " + getter + " " + page); toStart.add(getter); - runningFetch.put(page, getter); + runningFetch.put(uri, getter); } catch (MalformedURLException e) { Logger.error(this, "IMPOSSIBLE-Malformed URI: " + page, e); - page.setStatus(Status.FAILED); + page.setStatus(Status.FATALLY_FAILED, "MalformedURLException"); } } - } finally { - getRoot().unlockPages(Status.QUEUED); - db.endThreadTransaction(); + } + } + + for (final ClientGetter g : toStart) { + ScheduledFuture future = callbackExecutor.scheduleWithFixedDelay(new Runnable() { + long lapsLeft = 10 * 60 * 60; // Ten hours + @Override + public void run() { + if (lapsLeft-- <= 0) { + g.cancel(clientContext); + Logger.minor(this, g + " aborted because of time-out"); + ScheduledFuture f = runningFutures.get(g); + f.cancel(false); + } + } + }, 10, 1, TimeUnit.SECONDS); + runningFutures.put(g, future); + try { + g.start(clientContext); + Logger.minor(this, g + " started"); + } catch (FetchException e) { + g.getClientCallback().onFailure(e, g); + continue; } } } + } + + /** + * Subscribe to USKs for PROCESSED_USKs. + */ + private void subscribeAllUSKs() { + synchronized (this) { + if (stopped) return; - for (ClientGetter g : toStart) { + db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); try { - g.start(clientContext); - Logger.minor(this, g + " started"); - } catch (FetchException e) { - g.getClientCallback().onFailure(e, g); + Iterator it = getRoot().getPages(Status.PROCESSED_USK); + while (it.hasNext()) { + Page page = it.next(); + Logger.debug(this, "Page " + page + " found in PROCESSED_USK."); + FreenetURI uri; + try { + uri = new FreenetURI(page.getURI()); + } catch (MalformedURLException e) { + // This could not be converted - ignore. + Logger.error(this, "USK could not be converted to uri " + page); + page.setStatus(Status.FATALLY_FAILED); + continue; + } + if (uri.isUSK()) { + subscribeUSK(uri); + } else { + Logger.error(this, "USK was not USK " + page); + page.setStatus(Status.FATALLY_FAILED); + } + } + } finally { + db.endThreadTransaction(); } } } @@ -258,30 +478,33 @@ public void startSomeRequests() { * Callback for fetching the pages */ private class ClientGetterCallback implements ClientGetCallback { - final Page page; - - public ClientGetterCallback(Page page) { - this.page = page; - } - - @Override + @Override public void onFailure(FetchException e, ClientGetter state) { + Logger.minor(this, + state + " onFailure: " + state.getURI() + " (q:" + callbackExecutor.getQueue().size() + ")", + e); + removeFuture(state); + if (stopped) return; - callbackExecutor.execute(new OnFailureCallback(e, state, page)); - Logger.minor(this, "Queued OnFailure: " + page + " (q:" + callbackExecutor.getQueue().size() + ")"); + callbackExecutor.execute(new OnFailureCallback(e, state)); } - @Override + @Override public void onSuccess(final FetchResult result, final ClientGetter state) { + Logger.minor(this, state + " onSuccess: " + state.getURI() + " (q:" + callbackExecutor.getQueue().size() + ")"); + removeFuture(state); + if (stopped) return; - callbackExecutor.execute(new OnSuccessCallback(result, state, page)); - Logger.minor(this, "Queued OnSuccess: " + page + " (q:" + callbackExecutor.getQueue().size() + ")"); + callbackExecutor.execute(new OnSuccessCallback(result, state)); } - public String toString() { - return super.toString() + ":" + page; + private void removeFuture(ClientGetter getter) { + ScheduledFuture future = runningFutures.remove(getter); + if (future != null) { + future.cancel(false); + } } @Override @@ -294,9 +517,9 @@ public RequestClient getRequestClient() { } } - private ClientGetter makeGetter(Page page) throws MalformedURLException { - ClientGetter getter = new ClientGetter(new ClientGetterCallback(page), - new FreenetURI(page.getURI()), ctx, + private ClientGetter makeGetter(FreenetURI uri) throws MalformedURLException { + ClientGetter getter = new ClientGetter(new ClientGetterCallback(), + uri, ctx, getPollingPriorityProgress(), null); return getter; } @@ -304,16 +527,14 @@ private ClientGetter makeGetter(Page page) throws MalformedURLException { protected class OnFailureCallback implements Runnable { private FetchException e; private ClientGetter state; - private Page page; - OnFailureCallback(FetchException e, ClientGetter state, Page page) { + OnFailureCallback(FetchException e, ClientGetter state) { this.e = e; this.state = state; - this.page = page; } public void run() { - onFailure(e, state, page); + onFailure(e, state); } } @@ -323,16 +544,14 @@ public void run() { protected class OnSuccessCallback implements Runnable { private FetchResult result; private ClientGetter state; - private Page page; - OnSuccessCallback(FetchResult result, ClientGetter state, Page page) { + OnSuccessCallback(FetchResult result, ClientGetter state) { this.result = result; this.state = state; - this.page = page; } public void run() { - onSuccess(result, state, page); + onSuccess(result, state); } } @@ -349,55 +568,17 @@ protected class SetConfigCallback implements Runnable { public void run() { synchronized (getRoot()) { getRoot().setConfig(config); - startSomeRequests(); - } - } - } - - protected class StartSomeRequestsCallback implements Runnable { - StartSomeRequestsCallback() { - } - - public void run() { - try { - Thread.sleep(30000); - } catch (InterruptedException e) { - // ignore - } - startSomeRequests(); - } - } - - protected static class CallbackPrioritizer implements Comparator { - public int compare(Runnable o1, Runnable o2) { - if (o1.getClass() == o2.getClass()) return 0; - - return getPriority(o1) - getPriority(o2); - } - - private int getPriority(Runnable r) { - if (r instanceof SetConfigCallback) { - return 0; - } else if (r instanceof OnFailureCallback) { - return 2; - } else if (r instanceof OnSuccessCallback) { - return 3; - } else if (r instanceof StartSomeRequestsCallback) { - return 4; } - - return -1; } } // this is java.util.concurrent.Executor, not freenet.support.Executor // always run with one thread --> more thread cause contention and slower! - public ThreadPoolExecutor callbackExecutor = new ThreadPoolExecutor( // - 1, 1, 600, TimeUnit.SECONDS, // - new PriorityBlockingQueue(5, new CallbackPrioritizer()), // + public ScheduledThreadPoolExecutor callbackExecutor = new ScheduledThreadPoolExecutor( + 1, new ThreadFactory() { public Thread newThread(Runnable r) { - Thread t = new NativeThread(r, "Spider", NativeThread.NORM_PRIORITY - 1, true); + Thread t = new NativeThread(r, "Spider", NativeThread.PriorityLevel.NORM_PRIORITY.value - 1, true); t.setDaemon(true); t.setContextClassLoader(Spider.this.getClass().getClassLoader()); return t; @@ -409,24 +590,34 @@ public Thread newThread(Runnable r) { * * @param result * @param state - * @param page */ // single threaded - protected void onSuccess(FetchResult result, ClientGetter state, Page page) { + protected void onSuccess(FetchResult result, ClientGetter state) { synchronized (this) { if (stopped) return; } - lastRequestFinishedAt.set(currentTimeMillis()); FreenetURI uri = state.getURI(); + FreenetURI dbURI = uri; + if (uri.isUSK() ) { + dbURI = uri.setSuggestedEdition(0L); + } + + lastRequestFinishedAt.set(currentTimeMillis()); ClientMetadata cm = result.getMetadata(); Bucket data = result.asBucket(); String mimeType = cm.getMIMEType(); boolean dbTransactionEnded = false; db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); + Page page = null; try { - librarybuffer.setBufferSize(getConfig().getNewFormatIndexBufferLimit()*1024*1024); + page = getRoot().getPageByURI(dbURI); + if (page == null) { + Logger.error(this, "Cannot find page " + dbURI); + return; + } + librarybuffer.setBufferSize(1 + getConfig().getNewFormatIndexBufferLimit()*1024*1024); /* * instead of passing the current object, the pagecallback object for every page is * passed to the content filter this has many benefits to efficiency, and allows us to @@ -434,7 +625,7 @@ protected void onSuccess(FetchResult result, ClientGetter state, Page page) { * provided). */ PageCallBack pageCallBack = new PageCallBack(page); - Logger.minor(this, "Successful: " + uri + " : " + page.getId()); + Logger.minor(this, "Successful: " + uri + " id=" + page.getId()); try { if ("text/plain".equals(mimeType)) { @@ -452,15 +643,17 @@ protected void onSuccess(FetchResult result, ClientGetter state, Page page) { } } pageCallBack.finish(); + page.setStatus(Status.NOT_PUSHED); + page.setLastFetched(); librarybuffer.maybeSend(); } catch (UnsafeContentTypeException e) { // wrong mime type - page.setStatus(Status.SUCCEEDED); + page.setStatus(Status.FATALLY_FAILED, "UnsafeContentTypeException"); db.endThreadTransaction(); dbTransactionEnded = true; - Logger.minor(this, "UnsafeContentTypeException " + uri + " : " + page.getId(), e); + Logger.minor(this, "" + e + " " + uri + " id=" + page.getId()); return; // Ignore } catch (IOException e) { // ugh? @@ -468,11 +661,10 @@ protected void onSuccess(FetchResult result, ClientGetter state, Page page) { return; } catch (Exception e) { // we have lots of invalid html on net - just normal, not error - Logger.normal(this, "exception on content filter for " + page, e); + Logger.normal(this, "exception on content filter for " + uri, e); return; } - page.setStatus(Status.NOT_PUSHED); db.endThreadTransaction(); dbTransactionEnded = true; @@ -486,45 +678,113 @@ protected void onSuccess(FetchResult result, ClientGetter state, Page page) { data.free(); synchronized (this) { - runningFetch.remove(page); + removeFromRunningFetches(uri); } - if (!stopped) startSomeRequests(); } finally { if (!dbTransactionEnded) { Logger.minor(this, "rollback transaction", new Exception("debug")); db.rollbackThreadTransaction(); db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); // page is now invalidated. - page = getRoot().getPageByURI(uri, false, ""); - if(page != null) page.setStatus(Status.FAILED); + if (page != null) { + page.setStatus(Status.FATALLY_FAILED, + "could not complete operation dbTransaction not ended"); + } db.endThreadTransaction(); } } } } - protected void onFailure(FetchException fe, ClientGetter state, Page page) { - Logger.minor(this, "Failed: " + page + " : " + state, fe); + private void removeFromRunningFetches(FreenetURI uri) { + if (runningFetches != null) { + for (Status status : Config.statusesToProcess) { + if (runningFetches.containsKey(status)) { + if (runningFetches.get(status).remove(uri) != null) { + break; + } + } + } + } + } + + /** + * Do what needs to be done when a fetch request has failed. + * + * @param fe Is the exception that make it fail. + * Used to decide what to do. + * @param getter is the ClientGetter that failed. + */ + protected void onFailure(FetchException fe, ClientGetter getter) { + final FreenetURI uri = getter.getURI(); + Logger.minor(this, "Failed: " + uri + " : " + getter); synchronized (this) { if (stopped) return; } + FreenetURI dbURI = uri; + if (uri.isUSK() ) { + dbURI = uri.setSuggestedEdition(0L); + } + lastRequestFinishedAt.set(currentTimeMillis()); boolean dbTransactionEnded = false; db.beginThreadTransaction(Storage.EXCLUSIVE_TRANSACTION); + Page page = null; try { - synchronized (page) { - if (fe.newURI != null) { - // redirect, mark as succeeded - queueURI(fe.newURI, "redirect from " + state.getURI(), false); - page.setStatus(Status.SUCCEEDED); - } else if (fe.isFatal()) { - // too many tries or fatal, mark as failed - page.setStatus(Status.FAILED); + page = getRoot().getPageByURI(dbURI); + if (page == null) { + return; + } + if (fe.newURI != null) { + // mark as succeeded + Status whereTo = Status.DONE; + if (uri.isUSK()) { + whereTo = Status.PROCESSED_USK; + } else if (uri.isKSK()) { + whereTo = Status.PROCESSED_KSK; + } + FreenetURI newURI = fe.newURI; + if (fe.mode == FetchException.FetchExceptionMode.NOT_ENOUGH_PATH_COMPONENTS) { + if (uri.isUSK() && !uri.hasMetaStrings()) { + newURI = uri.pushMetaString(""); + whereTo = Status.DONE; + } + } + page.setStatus(whereTo, "Redirected to " + newURI + " because of " + fe.getMode()); + // redirect. This is done in an independent Runnable to get its own lock. + final FreenetURI redirectedTo = newURI; + final FreenetURI redirectedFrom = uri; + callbackExecutor.execute(new Runnable() { + @Override + public void run() { + // If this is a new Edition it is moved again from PROCESSED_USK to NEW_EDITION. + queueURI(redirectedTo, "redirect from " + redirectedFrom); + } + + }); + } else if (fe.isFatal()) { + // too many tries or fatal, mark as failed + page.setStatus(Status.FATALLY_FAILED, "Fatal: " + fe.getMode()); + } else { + // If uris are already queued that are afterwards rendered "bad" + // by changing the badlisted extensions list, then they are cleaned + // out if they fail. + boolean badListed = false; + String sURI = uri.toString(); + String lowerCaseURI = sURI.toLowerCase(Locale.US); + for (String ext : getRoot().getConfig().getBadlistedExtensions()) { + if (lowerCaseURI.endsWith(ext)) { + badListed = true; + } + } + + if (badListed) { + page.setStatus(Status.FATALLY_FAILED); } else { // requeue at back - page.setStatus(Status.QUEUED); + page.setStatus(Status.FAILED); } } db.endThreadTransaction(); @@ -533,14 +793,12 @@ protected void onFailure(FetchException fe, ClientGetter state, Page page) { Logger.error(this, "Unexcepected exception in onFailure(): " + e, e); throw new RuntimeException("Unexcepected exception in onFailure()", e); } finally { - runningFetch.remove(page); + removeFromRunningFetches(uri); if (!dbTransactionEnded) { Logger.minor(this, "rollback transaction", new Exception("debug")); db.rollbackThreadTransaction(); } } - - startSomeRequests(); } private boolean garbageCollecting = false; @@ -554,12 +812,17 @@ public void terminate(){ synchronized (this) { stopped = true; - for (Map.Entry me : runningFetch.entrySet()) { - ClientGetter getter = me.getValue(); - Logger.minor(this, "Canceling request" + getter); - getter.cancel(clientContext); + for (Status status : Config.statusesToProcess) { + for (Map.Entry me : runningFetches.get(status).entrySet()) { + ClientGetter getter = me.getValue(); + Logger.minor(this, "Canceling request" + getter); + getter.cancel(clientContext); + } + runningFetches.get(status).clear(); + } + for (SubscribedToUSK stu : new HashSet(subscribedToUSKs)) { + stu.unsubscribe(); } - runningFetch.clear(); callbackExecutor.shutdownNow(); } librarybuffer.terminate(); @@ -604,13 +867,34 @@ public synchronized void runPlugin(PluginRespirator pr) { FreenetURI[] initialURIs = core.getBookmarkURIs(); for (int i = 0; i < initialURIs.length; i++) { - queueURI(initialURIs[i], "bookmark", false); + queueURI(initialURIs[i], "bookmark"); } librarybuffer = new LibraryBuffer(pr, this); librarybuffer.start(); - callbackExecutor.execute(new StartSomeRequestsCallback()); + callbackExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + startFetches(); + } catch (Throwable e) { + Logger.error(this, "startFetches throws", e); + } + } + + }, 30L, 1L, TimeUnit.SECONDS); + callbackExecutor.schedule(new Runnable() { + @Override + public void run() { + try { + subscribeAllUSKs(); + } catch (Throwable e) { + Logger.error(this, "startSubscribeUSKs throws", e); + } + } + + }, 10L, TimeUnit.SECONDS); } private WebInterface webInterface; @@ -627,12 +911,17 @@ public class PageCallBack implements FoundURICallback{ private String title; private int totalWords; - protected final boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this); // per instance, allow changing on the fly + protected final boolean logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this); // per instance, allow changing on the fly PageCallBack(Page page) { this.page = page; try { - this.uri = new FreenetURI(page.getURI()); + uri = new FreenetURI(page.getURI()); + final long edition = page.getEdition(); + if (edition != 0L) { + uri = uri.setSuggestedEdition(edition); + } + Logger.debug(this, "New PageCallBack for " + this.page + " (" + this.uri + ")."); } catch (MalformedURLException ex) { Logger.error(this, "Error creating uri from '"+page.getURI()+"'", ex); } @@ -653,7 +942,7 @@ public void foundURI(FreenetURI uri) { public void foundURI(FreenetURI uri, boolean inline) { if (stopped) throw new RuntimeException("plugin stopping"); if (logDEBUG) Logger.debug(this, "foundURI " + uri + " on " + page); - queueURI(uri, "Added from " + page.getURI(), false); + queueURI(uri, "Added from " + this.uri); } protected Integer lastPosition = null; @@ -713,9 +1002,10 @@ void finish() { // Which is equal to log ( total count of files ) - log ( count of files with this word in ) librarybuffer.setRelevance(termPageEntry, ((float)termPageEntry.positionsSize()) / ((float)totalWords)); } + Logger.debug(this, "Finished PageCallBack for " + this.page + " (" + this.uri + ")."); } - HashMap tpes = new HashMap(); + HashMap tpes = new HashMap(); /** * Add a word to the database for this page @@ -754,28 +1044,6 @@ public void onFinishedPage() { } } - @Override - public void onFoundEdition(long l, USK key, ClientContext context, boolean metadata, - short codec, byte[] data, boolean newKnownGood, boolean newSlotToo) { - FreenetURI uri = key.getURI(); - /*- - * FIXME this code don't make sense - * (1) runningFetchesByURI contain SSK, not USK - * (2) onFoundEdition always have the edition set - * - if(runningFetchesByURI.containsKey(uri)) runningFetchesByURI.remove(uri); - uri = key.getURI().setSuggestedEdition(l); - */ - queueURI(uri, "USK found edition", true); - startSomeRequests(); - } - - @Override - public short getPollingPriorityNormal() { - return (short) Math.min(RequestStarter.MINIMUM_FETCHABLE_PRIORITY_CLASS, getRoot().getConfig().getRequestPriority() + 1); - } - - @Override public short getPollingPriorityProgress() { return getRoot().getConfig().getRequestPriority(); } @@ -809,8 +1077,6 @@ protected Page getPageById(long id) { return getRoot().getPageById(id); } - // language for I10N - private LANGUAGE language; @Override public String getString(String key) { @@ -820,17 +1086,23 @@ public String getString(String key) { @Override public void setLanguage(LANGUAGE newLanguage) { - language = newLanguage; + Logger.debug(this, "New language set " + newLanguage + " - ignored."); } public PageMaker getPageMaker() { return pageMaker; } - public List getRunningFetch() { - synchronized (runningFetch) { - return new ArrayList(runningFetch.keySet()); + public List getRunningFetch(Status status) { + List result = new ArrayList(); + synchronized (runningFetches) { + if (runningFetches != null) { + for (FreenetURI uri : runningFetches.get(status).keySet()) { + result.add(uri.toString()); + } + } } + return result; } public PluginRespirator getPluginRespirator() { @@ -846,12 +1118,52 @@ public void resetPages(Status from, Status to) { int count = 0; Iterator pages = getRoot().getPages(from); while(pages.hasNext()) { - pages.next().setStatus(to); + Page page = pages.next(); + Logger.debug(this, "Page " + page + " found in " + from + "."); + page.setStatus(to); count++; } System.out.println("Reset "+count+" pages status from "+from+" to "+to); } + public void donePages() { + // Not a separate transaction, commit with the index updates. + Status from = Status.NOT_PUSHED; + int count = 0; + Iterator pages = getRoot().getPages(from); + while(pages.hasNext()) { + Page page = pages.next(); + Status to; + FreenetURI uri; + try { + uri = new FreenetURI(page.getURI()); + if (uri.isCHK()) { + to = Status.DONE; + } else if (uri.isKSK()) { + to = Status.PROCESSED_KSK; + } else if (uri.isSSK()) { + to = Status.DONE; + } else if (uri.isUSK() && + uri.hasMetaStrings() && !uri.getMetaString().equals("")) { + // This is not the top element of this USK. + to = Status.DONE; + } else if (uri.isUSK()) { + to = Status.PROCESSED_USK; + subscribeUSK(uri); + } else { + Logger.error(this, "Cannot understand the type of the key " + uri); + to = Status.DONE; + } + } catch (MalformedURLException e) { + to = Status.DONE; + } + Logger.debug(this, "Page " + page + " found in " + from + "."); + page.setStatus(to); + count++; + } + Logger.minor(this, "Considered " + count + " pages processed."); + } + public boolean realTimeFlag() { return false; // We definitely want throughput here. } diff --git a/src/plugins/Spider/db/Config.java b/src/plugins/Spider/db/Config.java index c12c506..22f394f 100644 --- a/src/plugins/Spider/db/Config.java +++ b/src/plugins/Spider/db/Config.java @@ -12,13 +12,20 @@ public class Config extends Persistent implements Cloneable { + public static final Status[] statusesToProcess = {Status.NEW, Status.NEW_EDITION, Status.FAILED}; + public static final boolean[] workingRelationsToProcess = {true, false}; private String indexTitle; private String indexOwner; private String indexOwnerEmail; private int maxShownURIs; - private int maxParallelRequestsWorking; - private int maxParallelRequestsNonWorking; + /** working, status + * + * This should be an array with dimensions working and status. + * This is problematic in the database so it will instead be stored + * as a string of semicolon-separated ints. + */ + private String maxParallelRequests; private int beginWorkingPeriod; // Between 0 and 23 private int endWorkingPeriod; // Between 0 and 23 private String[] badlistedExtensions; @@ -40,8 +47,7 @@ public Config(Storage storage) { maxShownURIs = 50; - maxParallelRequestsWorking = 0; - maxParallelRequestsNonWorking = 0; + maxParallelRequests = ""; beginWorkingPeriod = 23; endWorkingPeriod = 7; @@ -111,25 +117,64 @@ public synchronized String getIndexOwnerEmail() { return indexOwnerEmail; } - public synchronized void setMaxParallelRequestsWorking(int maxParallelRequests) { - assert !isPersistent(); - this.maxParallelRequestsWorking = maxParallelRequests; + private int workingIndex(boolean b) { + for (int i = 0; i < workingRelationsToProcess.length; i++) { + if (workingRelationsToProcess[i] == b) { + return i; + } + } + throw new RuntimeException(); + } + + private int statusIndex(Status status) { + for (int i = 0; i < statusesToProcess.length; i++) { + if (statusesToProcess[i] == status) { + return i; + } + } + throw new RuntimeException(); } - public synchronized int getMaxParallelRequestsWorking() { - return maxParallelRequestsWorking; + private int[][] unpackMaxParallelRequests() { + int[][] requests = new int[workingRelationsToProcess.length][statusesToProcess.length]; + String[] arr = maxParallelRequests.split(";"); + int arrIndex = 0; + for (int w = 0; w < workingRelationsToProcess.length; w++) { + for (int s= 0; s < statusesToProcess.length; s++) { + requests[w][s] = 0; + if (arrIndex < arr.length && !arr[arrIndex].equals("")) { + try { + requests[w][s] = Integer.parseInt(arr[arrIndex++]); + } catch (NumberFormatException e) { + // Ignore if we can't do the conversion. + } + } + } + } + return requests; } - public synchronized void setMaxParallelRequestsNonWorking(int maxParallelRequests) { + public synchronized void setMaxParallelRequests(boolean working, Status status, int maxParallelRequests) { assert !isPersistent(); - this.maxParallelRequestsNonWorking = maxParallelRequests; + int[][] requests = unpackMaxParallelRequests(); + requests[workingIndex(working)][statusIndex(status)] = maxParallelRequests; + + StringBuilder sb = new StringBuilder(); + for (int w = 0; w < workingRelationsToProcess.length; w++) { + for (int s= 0; s < statusesToProcess.length; s++) { + sb.append(Integer.toString(requests[w][s])); + sb.append(";"); + } + } + this.maxParallelRequests = sb.toString(); } - public synchronized int getMaxParallelRequestsNonWorking() { - return maxParallelRequestsNonWorking; + public synchronized int getMaxParallelRequests(boolean working, Status status) { + int[][] requests = unpackMaxParallelRequests(); + return requests[workingIndex(working)][statusIndex(status)]; } - public synchronized int getMaxParallelRequests() { + public synchronized int getMaxParallelRequests(Status status) { int actualHour = Calendar.getInstance().get(Calendar.HOUR_OF_DAY); Boolean isWorking = true; @@ -145,11 +190,7 @@ public synchronized int getMaxParallelRequests() { isWorking = (actualHour > this.getBeginWorkingPeriod() || actualHour < this.getEndWorkingPeriod()); } - if(isWorking) { - return this.getMaxParallelRequestsWorking(); - } else { - return this.getMaxParallelRequestsNonWorking(); - } + return this.getMaxParallelRequests(isWorking, status); } public synchronized void setBeginWorkingPeriod(int beginWorkingPeriod) { diff --git a/src/plugins/Spider/db/Page.java b/src/plugins/Spider/db/Page.java index 40c2c4f..ed44c48 100644 --- a/src/plugins/Spider/db/Page.java +++ b/src/plugins/Spider/db/Page.java @@ -3,6 +3,10 @@ */ package plugins.Spider.db; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + import freenet.support.Logger; import plugins.Spider.org.garret.perst.FieldIndex; import plugins.Spider.org.garret.perst.IPersistentMap; @@ -15,41 +19,85 @@ public class Page extends Persistent implements Comparable { protected long id; /** URI of the page */ protected String uri; + /** suggestedEdition of the page */ + protected long edition; /** Title */ protected String pageTitle; /** Status */ protected Status status; /** Last Change Time */ protected long lastChange; + /** Last Fetched Time + * + * This is for the case when many USK pages are updated more often + * than they are fetched. In that case, this is used to prioritize + * the update of older pages over pages that were recently fetched. + */ + protected long lastFetched; /** Comment, for debugging */ protected String comment; public Page() { } - Page(String uri, String comment, Storage storage) { + Page(String uri, long edition, String comment, Storage storage) { this.uri = uri; + this.edition = edition; this.comment = comment; - this.status = Status.QUEUED; + this.status = Status.NEW; this.lastChange = System.currentTimeMillis(); + this.lastFetched = 0L; // 0 means never fetched. storage.makePersistent(this); } - - public synchronized void setStatus(Status status) { + + Page(String uri, String comment, Storage storage) { + this(uri, 0L, comment, storage); + } + + public long getEdition() { + return edition; + } + + public synchronized void setStatus(long edition, Status status, String comment) { + List mess = new ArrayList(); + if (edition != 0L) { + mess.add("edition " + edition); + } + if (status != null) { + mess.add("status " + status); + } + if (comment != null) { + mess.add("comment \"" + comment + "\""); + } + Logger.debug(this, "New " + String.join(", ", mess) + " for " + this); preModify(); - this.status = status; + if (edition != 0L) { + this.edition = edition; + } + if (status != null) { + this.status = status; + } + if (comment != null) { + this.comment = comment; + } postModify(); } + public synchronized void setStatus(Status status) { + setStatus(status, null); + } + + public synchronized void setStatus(Status status, String comment) { + setStatus(0, status, comment); + } + public Status getStatus() { return status; } public synchronized void setComment(String comment) { - preModify(); - this.comment = comment; - postModify(); + setStatus(0, null, comment); } public String getComment() { @@ -65,15 +113,42 @@ public long getId() { } public void setPageTitle(String pageTitle) { - preModify(); + Logger.debug(this, "New page title for " + this); this.pageTitle = pageTitle; - postModify(); + modify(); } public String getPageTitle() { return pageTitle; } + public String getLastChangeAsString() { + return new Date(lastChange).toString(); + } + + public Date getLastChange() { + return new Date(lastChange); + } + + public void setLastFetched() { + lastFetched = System.currentTimeMillis(); + } + + public boolean hasBeenFetched() { + return lastFetched != 0L; + } + + public Date getLastFetched() { + return new Date(lastFetched); + } + + public String getLastFetchedAsString() { + if (lastFetched > 0L) { + return new Date(lastFetched).toString(); + } + return ""; + } + @Override public int hashCode() { return (int) (id ^ (id >>> 32)); @@ -93,7 +168,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "[PAGE: id=" + id + ", title=" + pageTitle + ", uri=" + uri + ", status=" + status + ", comment=" + return "[PAGE: id=" + id + ", title=" + pageTitle + ", uri=" + uri + ", edition=" + edition + " status=" + status + ", comment=" + comment + "]"; } @@ -115,10 +190,10 @@ private void preModify() { if(e.getErrorCode() == StorageError.KEY_NOT_FOUND) { // No serious consequences, so just log it, rather than killing the whole thing. Logger.error(this, "Page: Key not found in index: "+this, e); - System.err.println("Page: Key not found in index: "+this); - e.printStackTrace(); - } else + } else { + Logger.error(this, "remove from index " + status + " failed", e); throw e; + } } finally { coll.unlock(); } diff --git a/src/plugins/Spider/db/PerstRoot.java b/src/plugins/Spider/db/PerstRoot.java index 52685f8..f4d0a7a 100644 --- a/src/plugins/Spider/db/PerstRoot.java +++ b/src/plugins/Spider/db/PerstRoot.java @@ -1,24 +1,19 @@ package plugins.Spider.db; import java.util.Iterator; -import java.util.List; import plugins.Spider.org.garret.perst.FieldIndex; -import plugins.Spider.org.garret.perst.IterableIterator; import plugins.Spider.org.garret.perst.Key; import plugins.Spider.org.garret.perst.Persistent; import plugins.Spider.org.garret.perst.Storage; import freenet.keys.FreenetURI; +import freenet.support.Logger; public class PerstRoot extends Persistent { protected FieldIndex idPage; protected FieldIndex uriPage; - protected FieldIndex queuedPages; - protected FieldIndex failedPages; - protected FieldIndex succeededPages; - protected FieldIndex notPushedPages; - protected FieldIndex indexedPages; + FieldIndex[] statusPages; private Config config; @@ -28,44 +23,69 @@ public PerstRoot() { public static PerstRoot createRoot(Storage storage) { PerstRoot root = new PerstRoot(); - root.idPage = storage.createFieldIndex(Page.class, "id", true); - root.uriPage = storage.createFieldIndex(Page.class, "uri", true); - root.queuedPages = storage.createFieldIndex(Page.class, "lastChange", false); - root.failedPages = storage.createFieldIndex(Page.class, "lastChange", false); - root.succeededPages = storage.createFieldIndex(Page.class, "lastChange", false); - root.notPushedPages = storage.createFieldIndex(Page.class, "lastChange", false); - root.indexedPages = storage.createFieldIndex(Page.class, "lastChange", false); - - root.config = new Config(storage); + root.create(storage); storage.setRoot(root); return root; } + + private void create(Storage storage) { + idPage = storage.createFieldIndex(Page.class, "id", true); + uriPage = storage.createFieldIndex(Page.class, "uri", true); + statusPages = new FieldIndex[Status.values().length]; + for (Status status : Status.values()) { + String fieldName = "lastChange"; + if (status == Status.NEW_EDITION) { + fieldName = "lastFetched"; + } + statusPages[status.ordinal()] = storage.createFieldIndex(Page.class, fieldName, true); + } + + config = new Config(storage); + } + /** + * Finds or creates pages in the database. + * + * @param uri The URI of the page to find. + * @param create if true then the page is created if it doesn't exist. + * @param comment is only used when create is true. + * @return the page. + */ public Page getPageByURI(FreenetURI uri, boolean create, String comment) { idPage.exclusiveLock(); uriPage.exclusiveLock(); - queuedPages.exclusiveLock(); + getPageIndex(Status.NEW).exclusiveLock(); try { Page page = uriPage.get(new Key(uri.toString())); if (create && page == null) { + Logger.debug(this, "New page created for " + uri.toString()); page = new Page(uri.toString(), comment, getStorage()); idPage.append(page); uriPage.put(page); - queuedPages.put(page); + getPageIndex(Status.NEW).put(page); } return page; } finally { - queuedPages.unlock(); + getPageIndex(Status.NEW).unlock(); uriPage.unlock(); idPage.unlock(); } } + /** + * Find a page in the database. + * @param uri The page to find. + * @return null if not found + */ + public Page getPageByURI(FreenetURI uri) { + return getPageByURI(uri, false, null); + } + public Page getPageById(long id) { idPage.sharedLock(); try { @@ -77,20 +97,7 @@ public Page getPageById(long id) { } FieldIndex getPageIndex(Status status) { - switch (status) { - case FAILED: - return failedPages; - case QUEUED: - return queuedPages; - case SUCCEEDED: - return succeededPages; - case NOT_PUSHED: - return notPushedPages; - case INDEXED: - return indexedPages; - default: - return null; - } + return statusPages[status.ordinal()]; } public void exclusiveLock(Status status) { diff --git a/src/plugins/Spider/db/Status.java b/src/plugins/Spider/db/Status.java index 8903940..a55958b 100644 --- a/src/plugins/Spider/db/Status.java +++ b/src/plugins/Spider/db/Status.java @@ -3,10 +3,22 @@ */ package plugins.Spider.db; +/** + * This enum also control the layout of the database so + * when changing this, be sure to update the dbVersion in + * Spider. + */ public enum Status { - /** For simplicity, running is also mark as QUEUED. + NEW, // Newly found URIs, i.e. never fetched. + NEW_EDITION, // Updated edition + NOT_PUSHED, + /** * NOT_PUSHED, when LibraryBuffer is enabled, means we have successfully fetched the page but have not - * yet uploaded the indexed data, so if we have an unclean shutdown we transfer all NOT_PUSHED to QUEUED + * yet uploaded the indexed data, so if we have an unclean shutdown we transfer all NOT_PUSHED to NEW * so they get re-run. */ - QUEUED, INDEXED, SUCCEEDED, FAILED, NOT_PUSHED + DONE, // The information is sent to library or there was no result. There is no more work to do. + PROCESSED_KSK, // The KSK has been sent to the library. We will rescan this later. + PROCESSED_USK, // The USK has been sent to the library. Subscriptions are set up for these. + FAILED, + FATALLY_FAILED, // The fetch "failed" fatally and we will ignore the result and never try again. } \ No newline at end of file diff --git a/src/plugins/Spider/index/TermEntry.java b/src/plugins/Spider/index/TermEntry.java index 24fcbb7..ce6f9a4 100644 --- a/src/plugins/Spider/index/TermEntry.java +++ b/src/plugins/Spider/index/TermEntry.java @@ -18,7 +18,8 @@ */ abstract public class TermEntry implements Comparable { - final static long serialVersionUID = 0xF23194B7F015560CL; + // final static long serialVersionUID = 0xF23194B7F015560CL; + final static long serialVersionUID2 = 0xF33194B7F015560CL; public enum EntryType { INDEX, TERM, PAGE diff --git a/src/plugins/Spider/index/TermEntryWriter.java b/src/plugins/Spider/index/TermEntryWriter.java index f3064c4..d5ff7c4 100644 --- a/src/plugins/Spider/index/TermEntryWriter.java +++ b/src/plugins/Spider/index/TermEntryWriter.java @@ -31,7 +31,7 @@ public static TermEntryWriter getInstance() { } public void writeObject(TermEntry en, DataOutputStream dos) throws IOException { - dos.writeLong(TermEntry.serialVersionUID); + dos.writeLong(TermEntry.serialVersionUID2); TermEntry.EntryType type = en.entryType(); dos.writeInt(type.ordinal()); dos.writeUTF(en.subj); @@ -39,14 +39,15 @@ public void writeObject(TermEntry en, DataOutputStream dos) throws IOException { switch (type) { case PAGE: TermPageEntry enn = (TermPageEntry)en; - enn.page.writeFullBinaryKeyWithLength(dos); - int size = enn.hasPositions() ? enn.positionsSize() : 0; + dos.writeUTF(enn.page.toString()); if(enn.title == null) - dos.writeInt(size); + dos.writeBoolean(false); else { - dos.writeInt(~size); // invert bits to signify title is set + dos.writeBoolean(true); dos.writeUTF(enn.title); } + int size = enn.hasPositions() ? enn.positionsSize() : 0; + dos.writeInt(size); if(size != 0) { if(enn.hasFragments()) { for(Map.Entry p : enn.positionsMap().entrySet()) { @@ -64,6 +65,8 @@ public void writeObject(TermEntry en, DataOutputStream dos) throws IOException { } } return; + default: + throw new RuntimeException("Not implemented"); } } diff --git a/src/plugins/Spider/index/TermPageEntry.java b/src/plugins/Spider/index/TermPageEntry.java index 0aeb8de..eefa05f 100644 --- a/src/plugins/Spider/index/TermPageEntry.java +++ b/src/plugins/Spider/index/TermPageEntry.java @@ -63,7 +63,9 @@ public TermPageEntry(String s, float r, FreenetURI u, String t, Map pos, Map runningFetch = spider.getRunningFetch(); Config config = spider.getConfig(); // Column 1 HTMLNode nextTableCell = overviewTableRow.addChild("td", "class", "first"); HTMLNode statusContent = pageMaker.getInfobox("#", "Spider Status", nextTableCell); - statusContent.addChild("#", "Running Request: " + runningFetch.size() + "/" - + config.getMaxParallelRequests()); - statusContent.addChild("br"); - statusContent.addChild("#", "Queued: " + queuedStatus.count); - statusContent.addChild("br"); - statusContent.addChild("#", "Indexed: " + indexedStatus.count); - statusContent.addChild("br"); - statusContent.addChild("#", "Succeeded: " + succeededStatus.count); + for (int i = 0; i < Config.statusesToProcess.length; i++) { + Status status = Config.statusesToProcess[i]; + List runningFetch = spider.getRunningFetch(status); + statusContent.addChild("#", "Running Request for " + status + ": " + runningFetch.size() + "/" + + config.getMaxParallelRequests(status)); + statusContent.addChild("br"); + + } + for (Status status : Status.values()) { + statusContent.addChild("#", status + ": " + getPageStatus(status).count); + statusContent.addChild("br"); + } + statusContent.addChild("#", "Queued Event: " + spider.callbackExecutor.getQueue().size()); statusContent.addChild("br"); - statusContent.addChild("#", "Not pushed: " + notPushedStatus.count); + statusContent.addChild("#", "Subscribed USKs: " + spider.getSubscribedToUSKs()); statusContent.addChild("br"); - statusContent.addChild("#", "Failed: " + failedStatus.count); - statusContent.addChild("br"); - statusContent.addChild("#", "Queued Event: " + spider.callbackExecutor.getQueue().size()); + statusContent.addChild("#", "Found new editions: " + spider.getEditionsFound()); statusContent.addChild("br"); statusContent.addChild("#", "Library buffer size: "+spider.getLibraryBufferSize()); long lastRequestFinishedAt = spider.getLastRequestFinishedAt(); @@ -136,61 +143,71 @@ public void writeContent(HTTPRequest request, HTMLNode contentNode) { addForm.addChild("label", "for", "addURI", "Add URI:"); addForm.addChild("input", new String[] { "name", "style" }, new String[] { "addURI", "width: 20em;" }); addForm.addChild("input", new String[] { "type", "value" }, new String[] { "submit", "Add" }); + mainContent.addChild("p"); + final File file = new File(".", "library.info"); + FileReader fr = null; + BufferedReader br = null; + try { + fr = new FileReader(file); + br = new BufferedReader(fr); + String line; + while ((line = br.readLine()) != null) { + mainContent.addChild("#", line); + mainContent.addChild("br"); + } + br.close(); + } catch (FileNotFoundException e) { + // There is no such file. That is fine. + } catch (IOException e) { + // We suddenly couldn't read this file. Strange problem. + throw new RuntimeException(e); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + // Ignore. + } + } + if (fr != null) { + try { + fr.close(); + } catch (IOException e) { + // Ignore. + } + } + } - InfoboxNode running = pageMaker.getInfobox("Running URI"); - HTMLNode runningBox = running.outer; - runningBox.addAttribute("style", "right: 0;"); - HTMLNode runningContent = running.content; + for (Status status : Config.statusesToProcess) { + List runningFetch = spider.getRunningFetch(status); + if (!runningFetch.isEmpty()) { + InfoboxNode running = pageMaker.getInfobox("Running URIs for " + status); + HTMLNode runningBox = running.outer; + runningBox.addAttribute("style", "right: 0;"); + HTMLNode runningContent = running.content; - if (runningFetch.isEmpty()) { - runningContent.addChild("#", "NO URI"); - } else { - HTMLNode list = runningContent.addChild("ol", "style", "overflow: auto; white-space: nowrap;"); + runningContent.addChild("#", "USKs shown without edition."); + HTMLNode list = runningContent.addChild("ol", "style", "overflow: auto; white-space: nowrap;"); - Iterator pi = runningFetch.iterator(); - int maxURI = config.getMaxShownURIs(); - for (int i = 0; i < maxURI && pi.hasNext(); i++) { - Page page = pi.next(); - HTMLNode litem = list.addChild("li", "title", page.getComment()); - litem.addChild("a", "href", "/freenet:" + page.getURI(), page.getURI()); + Iterator pi = runningFetch.iterator(); + int maxURI = config.getMaxShownURIs(); + for (int i = 0; i < maxURI && pi.hasNext(); i++) { + String runningURI = pi.next(); + HTMLNode litem = list.addChild("li"); + litem.addChild("a", "href", "/freenet:" + runningURI, runningURI); + } + contentNode.addChild(runningBox); } } - contentNode.addChild(runningBox); - - InfoboxNode queued = pageMaker.getInfobox("Queued URI"); - HTMLNode queuedBox = queued.outer; - queuedBox.addAttribute("style", "right: 0; overflow: auto;"); - HTMLNode queuedContent = queued.content; - listPages(queuedStatus, queuedContent); - contentNode.addChild(queuedBox); - - InfoboxNode indexed = pageMaker.getInfobox("Indexed URI"); - HTMLNode indexedBox = indexed.outer; - indexedBox.addAttribute("style", "right: 0;"); - HTMLNode indexedContent = indexed.content; - listPages(indexedStatus, indexedContent); - contentNode.addChild(indexedBox); - - InfoboxNode succeeded = pageMaker.getInfobox("Succeeded URI"); - HTMLNode succeededBox = succeeded.outer; - succeededBox.addAttribute("style", "right: 0;"); - HTMLNode succeededContent = succeeded.content; - listPages(succeededStatus, succeededContent); - contentNode.addChild(succeededBox); - - InfoboxNode notPushed = pageMaker.getInfobox("Not pushed URI"); - HTMLNode notPushedBox = notPushed.outer; - notPushedBox.addAttribute("style", "right: 0;"); - HTMLNode notPushedContent = notPushed.content; - listPages(notPushedStatus, notPushedContent); - contentNode.addChild(notPushedBox); - - InfoboxNode failed = pageMaker.getInfobox("Failed URI"); - HTMLNode failedBox = failed.outer; - failedBox.addAttribute("style", "right: 0;"); - HTMLNode failedContent = failed.content; - listPages(failedStatus, failedContent); - contentNode.addChild(failedBox); + + for (Status status : Status.values()) { + InfoboxNode d = pageMaker.getInfobox(status + " URIs"); + HTMLNode box = d.outer; + box.addAttribute("style", "right: 0; overflow: auto;"); + HTMLNode content = d.content; + listPages(getPageStatus(status), content); + contentNode.addChild(box); + } } //-- Utilities @@ -201,12 +218,12 @@ private PageStatus getPageStatus(Status status) { Iterator it = root.getPages(status); int showURI = spider.getConfig().getMaxShownURIs(); - List page = new ArrayList(); - while (page.size() < showURI && it.hasNext()) { - page.add(it.next()); + List pages = new ArrayList(); + while (pages.size() < showURI && it.hasNext()) { + pages.add(it.next()); } - return new PageStatus(count, page); + return new PageStatus(count, pages); } } @@ -219,6 +236,23 @@ private void listPages(PageStatus pageStatus, HTMLNode parent) { for (Page page : pageStatus.pages) { HTMLNode litem = list.addChild("li", "title", page.getComment()); litem.addChild("a", "href", "/freenet:" + page.getURI(), page.getURI()); + String title = page.getPageTitle(); + if (title == null) { + title = ""; + } + String changed = page.getLastFetchedAsString(); + if (!changed.equals("")) { + title = "Last changed " + changed + " " + title; + } + long edition = page.getEdition(); + if (edition != 0L) { + title = "Edition " + edition + " " + title; + } + litem.addChild("p", + " " + + page.getLastChangeAsString() + " " + + title + " " + + "(" + page.getComment() + ")"); } } }