diff --git a/dataservice/api/common/pagination.py b/dataservice/api/common/pagination.py index c7a15c87e..f595ffd0e 100644 --- a/dataservice/api/common/pagination.py +++ b/dataservice/api/common/pagination.py @@ -2,6 +2,7 @@ from functools import wraps from dateutil import parser from datetime import datetime +from dataservice.extensions import db, indexd def paginated(f): @@ -101,6 +102,18 @@ def indexd_pagination(q, after, limit): :returns: A Pagination object """ + def prefetch_indexd(after): + """ Compute dids for the page and have indexd fetch them in bulk """ + model = q._entities[0].mapper.entity + gfs = (q.order_by(model.created_at.asc()) + .filter(model.created_at > after) + .with_entities(model.latest_did) + .limit(limit).all()) + dids = [gf[0] for gf in gfs] + indexd.prefetch(dids) + + indexd.clear_cache() + prefetch_indexd(after) pager = Pagination(q, after, limit) keep = [] refresh = True @@ -108,18 +121,22 @@ def indexd_pagination(q, after, limit): # Continue updating the page until we get a page with no deleted files while (pager.total > 0 and refresh): refresh = False - # Move the cursor ahead to the last valid file - next_after = keep[-1].created_at if len(keep) > 0 else after - # Number of results needed to fulfill the original limit - remain = limit - len(keep) - pager = Pagination(q, next_after, remain) - for st in pager.items: if hasattr(st, 'was_deleted') and st.was_deleted: refresh = True else: keep.append(st) + # Only fetch more if we saw there were some items that were deleted + if refresh: + # Move the cursor ahead to the last valid file + next_after = keep[-1].created_at if len(keep) > 0 else after + # Number of results needed to fulfill the original limit + remain = limit - len(keep) + + prefetch_indexd(next_after) + pager = Pagination(q, next_after, remain) + # Replace original page's items with new list of valid files pager.items = keep pager.after = next_after if next_after else after diff --git a/dataservice/extensions/flask_indexd.py b/dataservice/extensions/flask_indexd.py index a574c8c6f..7da864325 100644 --- a/dataservice/extensions/flask_indexd.py +++ b/dataservice/extensions/flask_indexd.py @@ -17,12 +17,18 @@ class Indexd(object): def __init__(self, app=None): self.app = app + # Used to store documents prefetched for a page + self.page_cache = {} if app is not None: self.init_app(app) def init_app(self, app): app.config.setdefault('INDEXD_URL', None) self.url = app.config['INDEXD_URL'] + if self.url: + self.bulk_url = '/'.join(self.url.split('/')[:-1] + ['bulk/']) + else: + self.bulk_url = None if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: @@ -42,6 +48,20 @@ def teardown(self, exception): if hasattr(ctx, 'indexd_session'): ctx.indexd_session.close() + def prefetch(self, dids): + """ + Fetch a list of documents by did into the page cache. + """ + # If running in dev mode, don't call indexd + if self.url is None or self.bulk_url is None: + return + resp = self.session.post(self.bulk_url + 'documents', json=dids).json() + for doc in resp: + self.page_cache[doc['did']] = doc + + def clear_cache(self): + self.page_cache = {} + def get(self, record): """ Retrieves a record from indexd @@ -55,6 +75,9 @@ def get(self, record): if self.url is None: return record + if record.latest_did in self.page_cache: + return self.page_cache[record.latest_did] + url = self.url + record.latest_did resp = self.session.get(url) self.check_response(resp) @@ -171,7 +194,8 @@ def update(self, record): if record.acl != old['acl']: self._update_all_acls(record) - url = '{}{}?rev={}'.format(self.url, record.latest_did, record.rev) + url = '{}{}?rev={}'.format(self.url, record.latest_did, + record.rev) if 'size' in req_body or 'hashes' in req_body: # Create a new version in indxed req_body['form'] = 'object' diff --git a/tests/mocks.py b/tests/mocks.py index 8ca8bccdd..34380bbe7 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -66,7 +66,7 @@ def __init__(self, *args, status_code=200, **kwargs): def post(self, url, *args, **kwargs): """ - Mocks a response from POST /index/ + Mocks a response from POST /index/ and POST /bulk/documents """ resp = { @@ -83,6 +83,9 @@ def post(self, url, *args, **kwargs): # Otherwise, assume creation of a new doc and track the baseid self.baseid_by_did[resp['did']] = resp['baseid'] + if 'bulk/documents' in url: + resp = [resp] + mock_resp = MockResp(resp=resp, status_code=self.status_code) return mock_resp