Skip to content

Commit

Permalink
⚡ Prefetch indexd documents in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
dankolbman committed Nov 28, 2018
1 parent d3b21d5 commit 33be6ba
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
29 changes: 23 additions & 6 deletions dataservice/api/common/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import wraps
from dateutil import parser
from datetime import datetime
from dataservice.extensions import db, indexd


def paginated(f):
Expand Down Expand Up @@ -101,25 +102,41 @@ def indexd_pagination(q, after, limit):
:returns: A Pagination object
"""
def prefetch_indexd(after):
""" Compute dids for the page and have indexd fetch them in bulk """
model = q._entities[0].mapper.entity
gfs = (q.order_by(model.created_at.asc())
.filter(model.created_at > after)
.with_entities(model.latest_did)
.limit(limit).all())
dids = [gf[0] for gf in gfs]
indexd.prefetch(dids)

indexd.clear_cache()
prefetch_indexd(after)
pager = Pagination(q, after, limit)
keep = []
refresh = True
next_after = None
# Continue updating the page until we get a page with no deleted files
while (pager.total > 0 and refresh):
refresh = False
# Move the cursor ahead to the last valid file
next_after = keep[-1].created_at if len(keep) > 0 else after
# Number of results needed to fulfill the original limit
remain = limit - len(keep)
pager = Pagination(q, next_after, remain)

for st in pager.items:
if hasattr(st, 'was_deleted') and st.was_deleted:
refresh = True
else:
keep.append(st)

# Only fetch more if we saw there were some items that were deleted
if refresh:
# Move the cursor ahead to the last valid file
next_after = keep[-1].created_at if len(keep) > 0 else after
# Number of results needed to fulfill the original limit
remain = limit - len(keep)

prefetch_indexd(next_after)
pager = Pagination(q, next_after, remain)

# Replace original page's items with new list of valid files
pager.items = keep
pager.after = next_after if next_after else after
Expand Down
26 changes: 25 additions & 1 deletion dataservice/extensions/flask_indexd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ class Indexd(object):

def __init__(self, app=None):
self.app = app
# Used to store documents prefetched for a page
self.page_cache = {}
if app is not None:
self.init_app(app)

def init_app(self, app):
app.config.setdefault('INDEXD_URL', None)
self.url = app.config['INDEXD_URL']
if self.url:
self.bulk_url = '/'.join(self.url.split('/')[:-1] + ['bulk/'])
else:
self.bulk_url = None
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
Expand All @@ -42,6 +48,20 @@ def teardown(self, exception):
if hasattr(ctx, 'indexd_session'):
ctx.indexd_session.close()

def prefetch(self, dids):
"""
Fetch a list of documents by did into the page cache.
"""
# If running in dev mode, don't call indexd
if self.url is None or self.bulk_url is None:
return
resp = self.session.post(self.bulk_url + 'documents', json=dids).json()
for doc in resp:
self.page_cache[doc['did']] = doc

def clear_cache(self):
self.page_cache = {}

def get(self, record):
"""
Retrieves a record from indexd
Expand All @@ -55,6 +75,9 @@ def get(self, record):
if self.url is None:
return record

if record.latest_did in self.page_cache:
return self.page_cache[record.latest_did]

url = self.url + record.latest_did
resp = self.session.get(url)
self.check_response(resp)
Expand Down Expand Up @@ -171,7 +194,8 @@ def update(self, record):
if record.acl != old['acl']:
self._update_all_acls(record)

url = '{}{}?rev={}'.format(self.url, record.latest_did, record.rev)
url = '{}{}?rev={}'.format(self.url, record.latest_did,
record.rev)
if 'size' in req_body or 'hashes' in req_body:
# Create a new version in indxed
req_body['form'] = 'object'
Expand Down
5 changes: 4 additions & 1 deletion tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, *args, status_code=200, **kwargs):

def post(self, url, *args, **kwargs):
"""
Mocks a response from POST /index/
Mocks a response from POST /index/ and POST /bulk/documents
"""

resp = {
Expand All @@ -83,6 +83,9 @@ def post(self, url, *args, **kwargs):
# Otherwise, assume creation of a new doc and track the baseid
self.baseid_by_did[resp['did']] = resp['baseid']

if 'bulk/documents' in url:
resp = [resp]

mock_resp = MockResp(resp=resp, status_code=self.status_code)
return mock_resp

Expand Down

0 comments on commit 33be6ba

Please sign in to comment.