Skip to content

Commit

Permalink
Merge pull request #101 from 4dn-dcic/single_txn_indexing
Browse files Browse the repository at this point in the history
1.2.0 -- Remove deferred queue
  • Loading branch information
carlvitzthum authored Jul 30, 2019
2 parents 3af5e57 + de23fa1 commit e52f7fe
Show file tree
Hide file tree
Showing 25 changed files with 359 additions and 251 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist: trusty
language: python
sudo: true
cache:
Expand All @@ -9,8 +10,6 @@ cache:
- node_modules
addons:
apt:
config:
retries: true
packages:
- oracle-java9-set-default
- bsdtar
Expand Down
2 changes: 1 addition & 1 deletion base.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ sqlalchemy.url = postgresql:///snowflakes
retry.attempts = 3
file_upload_bucket = snowflakes-files-dev
file_upload_profile_name = snowflakes-files-upload
elasticsearch.server = localhost:9200
elasticsearch.server = 127.0.0.1:9200
aws_ip_ranges_path = %(here)s/aws-ip-ranges.json
download_proxy = https://download.encodeproject.org/
annotations_path = %(here)s/annotations.json
Expand Down
4 changes: 0 additions & 4 deletions development.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
[app:app]
use = config:base.ini#app
sqlalchemy.url = postgresql://postgres@:5432/postgres?host=/tmp/snovault/pgdata
snp_search.server = localhost:9200
load_test_only = true
create_tables = true
testing = true
Expand Down Expand Up @@ -39,9 +38,6 @@ use = egg:rutter#urlmap
[composite:indexer]
use = config:base.ini#indexer

[composite:file_indexer]
use = config:base.ini#file_indexer

###
# wsgi server configuration
###
Expand Down
9 changes: 0 additions & 9 deletions etc/snovault-apache.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ LogFormat "%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\" %{
</IfModule>
</Directory>

<Directory /srv/snowflakes/parts/production-fileindexer>
Order deny,allow
Allow from all
<IfModule access_compat_module>
Require all granted
</IfModule>
</Directory>


# Specifying process-group and application-group here ensures processes are started on httpd start
WSGIScriptAlias / /srv/snowflakes/parts/production/wsgi process-group=snowflakes application-group=%{GLOBAL}
Expand Down Expand Up @@ -122,4 +114,3 @@ ErrorDocument 403 "Forbidden. HTTPS required for authenticated access."
#RewriteCond %{HTTP:X-Forwarded-Proto} =http
#RewriteCond %{HTTP_HOST} ^(www\.encodeproject\.org|test\.encodedcc\.org)$
#RewriteRule ^ https://%{HTTP_HOST}%{REQUEST_URI} [redirect=permanent,last,qsappend]

3 changes: 0 additions & 3 deletions production.ini.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ elasticsearch.aws_auth = true
[composite:indexer]
use = config:base.ini#indexer

[composite:fileindexer]
use = config:base.ini#fileindexer

[pipeline:main]
pipeline =
config:base.ini#memlimit
Expand Down
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@

setup(
name='snovault',
version='0.10',
version='1.2.0',
description='Snovault Hybrid Object Relational Database Framework',
long_description=README + '\n\n' + CHANGES,
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
package_data={'':['nginx-dev.conf']},
zip_safe=False,
author='Benjamin Hitz',
author_email='hitz@stanford.edu',
url='http://github.com/ENCODE-DCC/snovault/',
author='Carl Vitzthum',
author_email='[email protected].edu',
url='http://github.com/4dn-dcic/snovault/',
license='MIT',
install_requires=requires,
tests_require=tests_require,
Expand Down Expand Up @@ -114,7 +114,7 @@
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
'Development Status :: 3 - Alpha',
'Development Status :: 4 - Beta',

# Indicate who your project is intended for
'Intended Audience :: Developers',
Expand Down
4 changes: 2 additions & 2 deletions src/snovault/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def main(global_config, **local_config):
settings = global_config
settings.update(local_config)

# TODO: move to dcicutils
set_logging(settings.get('elasticsearch.server'), settings.get('production'))
set_logging(in_prod=settings.get('production'))
# set_logging(settings.get('elasticsearch.server'), settings.get('production'))

# TODO - these need to be set for dummy app
# settings['snovault.jsonld.namespaces'] = json_asset('snovault:schemas/namespaces.json')
Expand Down
39 changes: 19 additions & 20 deletions src/snovault/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def configure_engine(settings):


def set_postgresql_statement_timeout(engine, timeout=20 * 1000):
""" Prevent Postgres waiting indefinitely for a lock.
"""
Prevent Postgres waiting indefinitely for a lock.
"""
from sqlalchemy import event
import psycopg2
Expand All @@ -99,21 +100,31 @@ def json_from_path(path, default=None):
return json.load(open(path))


def configure_dbsession(config):
def configure_dbsession(config, clear_data=False):
"""
Create a sqlalchemy engine and a session that uses it, the latter of which
is added to the registry. Handle some extra registration
"""
import snovault.storage
import zope.sqlalchemy
from sqlalchemy import orm
from snovault import DBSESSION
from snovault.storage import Base

settings = config.registry.settings
DBSession = settings.pop(DBSESSION, None)
if DBSession is None:

# handle creating the database engine separately with indexer_worker
if DBSession is None and not settings.get('indexer_worker'):
engine = configure_engine(settings)

# useful for test instances where we want to clear the data
if clear_data:
Base.metadata.drop_all(engine)

if asbool(settings.get('create_tables', False)):
from snovault.storage import Base
Base.metadata.create_all(engine)

import snovault.storage
import zope.sqlalchemy
from sqlalchemy import orm

DBSession = orm.scoped_session(orm.sessionmaker(bind=engine))
zope.sqlalchemy.register(DBSession)
snovault.storage.register(DBSession)
Expand Down Expand Up @@ -210,18 +221,6 @@ def main(global_config, **local_config):
config.include('snovault.elasticsearch')
config.include('.search')

if 'snp_search.server' in config.registry.settings:
addresses = aslist(config.registry.settings['snp_search.server'])
config.registry['snp_search'] = Elasticsearch(
addresses,
serializer=PyramidJSONSerializer(json_renderer),
connection_class=TimedRequestsHttpConnection,
retry_on_timeout=True,
timeout=60,
maxsize=50
)
config.include('.region_search')
config.include('.peak_indexer')
config.include(static_resources)
config.include(changelogs)

Expand Down
5 changes: 4 additions & 1 deletion src/snovault/commands/es_index_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def main():
''' Indexes app data loaded to elasticsearch '''

import argparse
from snovault import set_logging
parser = argparse.ArgumentParser(
description="Index data in Elastic Search", epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
Expand All @@ -40,7 +41,9 @@ def main():
app = get_app(args.config_uri, args.app_name, options)

# Loading app will have configured from config file. Reconfigure here:
logging.getLogger('snovault').setLevel(logging.DEBUG)
# Use `es_server=app.registry.settings.get('elasticsearch.server')` when ES logging is working
set_logging(in_prod=app.registry.settings.get('production'), level=logging.INFO)

return run(app, args.uuid)


Expand Down
1 change: 1 addition & 0 deletions src/snovault/crud_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def get_linking_items(context, request, render=None):
result = {
'status': 'success',
'@type': ['result'],
'display_title': 'Links to %s' % item_uuid,
'notification' : '%s has %s items linking to it. This may include rev_links if status != deleted' % (item_uuid, len(links)),
'uuids_linking_to': links
}
Expand Down
14 changes: 7 additions & 7 deletions src/snovault/elasticsearch/create_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import transaction
import os
import argparse
from snovault import set_logging
import logging
from timeit import default_timer as timer

Expand Down Expand Up @@ -387,6 +386,10 @@ def es_mapping(mapping, agg_items_mapping):
'type': 'keyword',
'include_in_all': False,
},
'max_sid': {
'type': 'keyword',
'include_in_all': False,
},
'item_type': {
'type': 'keyword',
},
Expand Down Expand Up @@ -1130,6 +1133,7 @@ def run(app, collections=None, dry_run=False, check_first=False, skip_indexing=F


def main():
from snovault import set_logging
parser = argparse.ArgumentParser(
description="Create Elasticsearch mapping", epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
Expand All @@ -1156,15 +1160,11 @@ def main():

args = parser.parse_args()

#logging.basicConfig()
app = get_app(args.config_uri, args.app_name)


# Loading app will have configured from config file. Reconfigure here:
set_logging(app.registry.settings.get('elasticsearch.server'),
app.registry.settings.get('production'), level=logging.INFO)
#global log
#log = structlog.get_logger(__name__)
# Use `es_server=app.registry.settings.get('elasticsearch.server')` when ES logging is working
set_logging(in_prod=app.registry.settings.get('production'), level=logging.INFO)

uuids = run(app, collections=args.item_type, dry_run=args.dry_run, check_first=args.check_first,
skip_indexing=args.skip_indexing, index_diff=args.index_diff, strict=args.strict,
Expand Down
18 changes: 6 additions & 12 deletions src/snovault/elasticsearch/es_index_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ def run(testapp, interval=DEFAULT_INTERVAL, dry_run=False, path='/index', update
)

# Make sure elasticsearch is up before trying to index.
if path == '/index_file':
return
else:
es = testapp.app.registry[ELASTIC_SEARCH]
es = testapp.app.registry[ELASTIC_SEARCH]
es.info()

# main listening loop
Expand Down Expand Up @@ -170,6 +167,7 @@ def update_status(error=None, result=None, indexed=None, **kw):
if 'interval' in settings:
kwargs['interval'] = float(settings['interval'])

# daemon thread that actually executes `run` method to call /index
listener = ErrorHandlingThread(target=run, name='listener', kwargs=kwargs)
listener.daemon = True
log.debug('starting listener')
Expand Down Expand Up @@ -204,6 +202,7 @@ def internal_app(configfile, app_name=None, username=None):

def main():
import argparse
from snovault import set_logging
parser = argparse.ArgumentParser(
description="Listen for changes from postgres and index in elasticsearch",
epilog=EPILOG,
Expand All @@ -221,7 +220,7 @@ def main():
help="Poll interval between notifications")
parser.add_argument(
'--path', default='/index',
help="Path of indexing view (/index or /index_file)")
help="Path of indexing view")
parser.add_argument('config_uri', help="path to configfile")
args = parser.parse_args()

Expand All @@ -234,14 +233,9 @@ def main():
if args.verbose or args.dry_run:
level = logging.DEBUG

set_logging(app.registry.settings.get('elasticsearch.server'),
app.registry.settings.get('production'), level=level)
#global log
#log = structlog.get_logger(__name__)

# Loading app will have configured from config file. Reconfigure here:
#logging.getLogger('snovault').setLevel(logging.DEBUG)

# Use `es_server=app.registry.settings.get('elasticsearch.server')` when ES logging is working
set_logging(in_prod=app.registry.settings.get('production'), level=logging.INFO)
return run(testapp, args.poll_interval, args.dry_run, args.path)


Expand Down
13 changes: 11 additions & 2 deletions src/snovault/elasticsearch/esstorage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import elasticsearch.exceptions
from snovault.util import get_root_request
from elasticsearch.helpers import scan
from elasticsearch_dsl import Search, Q
from pyramid.threadlocal import get_current_request
Expand Down Expand Up @@ -83,6 +82,10 @@ def uuid(self):
def sid(self):
return self.source['sid']

@property
def max_sid(self):
return self.source['max_sid']

def used_for(self, item):
alsoProvides(item, ICachedItem)

Expand Down Expand Up @@ -284,7 +287,7 @@ def get_rev_links(self, model, rel, *item_types):

def get_sids_by_uuids(self, rids):
"""
Currently not implemented. Just return an empty dict
Currently not implemented for ES. Just return an empty dict
Args:
rids (list): list of string rids (uuids)
Expand All @@ -294,6 +297,12 @@ def get_sids_by_uuids(self, rids):
"""
return {}

def get_max_sid(self):
"""
Currently not implemented for ES. Just return None
"""
return None

def purge_uuid(self, rid, item_type=None, registry=None):
"""
Purge a uuid from the write storage (Elasticsearch)
Expand Down
Loading

0 comments on commit e52f7fe

Please sign in to comment.