forked from openeventdata/stanford_pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.py
186 lines (149 loc) · 5.96 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# -*- encoding=utf-8 -*-
import os
import sys
import glob
import parser
import logging
import datetime
import argparse
from pymongo import MongoClient
from ConfigParser import ConfigParser
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, F
def make_conn(db_auth, db_user, db_pass, db_host=None, elasticsearch=False):
"""
Function to establish a connection to a local MonoDB instance.
Parameters
----------
db_auth: String.
MongoDB database that should be used for user authentication.
db_user: String.
Username for MongoDB authentication.
db_user: String.
Password for MongoDB authentication.
Returns
-------
collection: pymongo.collection.Collection.
Collection within MongoDB that holds the scraped news stories.
"""
if not elasticsearch:
if db_host:
client = MongoClient(db_host)
else:
client = MongoClient()
if db_auth:
client[db_auth].authenticate(db_user, db_pass)
database = client.event_scrape
collection = database['stories']
else:
collection=Elasticsearch()
return collection
def query_date(collection, date, num_days, elasticsearch, index):
"""
Function to query the MongoDB instance and obtain results for the desired
date range. Pulls stories that aren't Stanford parsed yet
(``"stanford: 0"``) and that were added within the last day.
Parameters
----------
collection: pymongo.collection.Collection.
Collection within MongoDB that holds the scraped news stories.
date: String.
Current date that the program is running.
Returns
-------
posts: pymongo.cursor.Cursor.
Results from the MongoDB query.
"""
logger = logging.getLogger('stanford')
gt_date = date - datetime.timedelta(days=num_days)
if not elasticsearch:
posts = collection.find({"$and": [{"date_added": {"$lte": date}},
{"date_added": {"$gt": gt_date}},
{"stanford": 0}]})
logger.info('Returning {} total stories.'.format(posts.count()))
else:
#Do a date range query and filter out the documents where stanford != 0.
lte_time = date.strftime('%Y-%m-%dT%X.%f%z')
gt_time = gt_date.strftime('%Y-%m-%dT%X.%f%z')
s = Search(using=collection,index=index,doc_type="news")\
.filter("range",published_date={"lte": lte_time, "gt": gt_time})\
.filter("or", [F("term", stanford=0), F("missing", field="stanford")])
page = s[0:100].execute()
total = page.hits.total
current_count = 100
posts=page.hits
while current_count < total:
page = s[current_count+1:current_count+101].execute()
posts.extend(page.hits)
current_count += 100
return posts
def _parse_config(cparser):
try:
stanford_dir = cparser.get('StanfordNLP', 'stanford_dir')
if 'Logging' in cparser.sections():
log_dir = cparser.get('Logging', 'log_file')
else:
log_dir = ''
if 'Auth' in cparser.sections():
auth_db = cparser.get('Auth', 'auth_db')
auth_user = cparser.get('Auth', 'auth_user')
auth_pass = cparser.get('Auth', 'auth_pass')
db_host = cparser.get('Auth', 'db_host')
else:
auth_db = ''
auth_user = ''
auth_pass = ''
db_host = os.getenv('MONGO_HOST')
return stanford_dir, log_dir, auth_db, auth_user, auth_pass, db_host
except Exception, e:
print 'There was an error parsing the config file. {}'.format(e)
raise
def parse_config():
"""Function to parse the config file."""
config_file = glob.glob('config.ini')
cparser = ConfigParser()
if config_file:
cparser.read(config_file)
else:
cwd = os.path.abspath(os.path.dirname(__file__))
config_file = os.path.join(cwd, 'default_config.ini')
cparser.read(config_file)
return _parse_config(cparser)
def run(run_date,num_days,elasticsearch,index):
stanford_dir, log_dir, db_auth, db_user, db_pass, db_host = parse_config()
# Setup the logging
logger = logging.getLogger('stanford')
logger.setLevel(logging.INFO)
if log_dir:
fh = logging.FileHandler(log_dir, 'a')
else:
fh = logging.FileHandler('stanford.log', 'a')
formatter = logging.Formatter('%(levelname)s %(asctime)s: %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info('Running.')
if not run_date:
run_date = datetime.datetime.utcnow()
else:
try:
run_date = datetime.datetime.strptime(run_date,'%Y%m%d')
except ValueError:
print('Bad run date')
raise SystemExit
coll = make_conn(db_auth, db_user, db_pass, db_host, elasticsearch)
stories = query_date(coll, run_date,num_days,elasticsearch,index)
parser.stanford_parse(coll, stories, stanford_dir,elasticsearch,index)
if __name__ == '__main__':
# Grab command line options.
argumentParser = argparse.ArgumentParser(description='Grab run_date.')
argumentParser.add_argument('--run_date', type=str, default='',
help='enter date in YYYYMMDD format')
argumentParser.add_argument('--num_days', type=int, default=1,
help='number of days before run_date to query')
argumentParser.add_argument('--es', dest='elasticsearch', action='store_true',
help='Use Elasticsearch on localhost')
argumentParser.set_defaults(elasticsearch=False)
argumentParser.add_argument('--index', type=str, default='stories-index',
help='the elasticsearch index containing the stories')
args = argumentParser.parse_args()
run(args.run_date,args.num_days,args.elasticsearch,args.index)