-
Notifications
You must be signed in to change notification settings - Fork 1
/
data.py
360 lines (305 loc) · 11.9 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import torch
from torch.utils.data import DataLoader
from utils import (
jsonlines_load,
get_logger,
torch_cached_property,
cached_property,
SubclassRegistry,
)
from entities import Entities
class TensorDictDataset():
"""Like a TensorDataset, but instead of a tuple of tensors,
__getitem__ returns a dict of tensors, which makes code using
this dataset more readable.
"""
def __init__(self, tensor_dict):
self.tensors = tensor_dict
def __getitem__(self, index):
return {k: tensor[index] for k, tensor in self.tensors.items()}
def __len__(self):
return len(next(iter(self.tensors.values())))
class Data(SubclassRegistry):
"""Baseclass for data.
"""
def __init__(self, conf):
self.log = get_logger().info
self.conf = conf
self.max_seq_len = conf.max_seq_len
@cached_property
def entities(self):
return Entities(self.conf)
@property
def conf_str(self):
"""A string that uniquely describes the configuration of this data. Used
for automatic caching.
"""
fields = ['top_n', 'max_seq_len']
if self.conf.max_train_inst:
fields.append('max_train_inst')
if self.conf.max_eval_inst:
fields.append('max_eval_inst')
if self.conf.max_test_inst:
fields.append('max_test_inst')
if self.conf.paraphrase_id is not None:
fields.append('paraphrase_id')
return '.'.join([
field + str(getattr(self.conf, field)) for field in fields])
@cached_property
def raw(self):
"""Raw data as read from a file on disk, which has not been
tensorized yet.
"""
return self.load_raw_data()
def get_max_inst(self, split_name):
"""The maximum number of instances to be included in the
specified split.
"""
test_name = 'test' if self.conf.max_test_inst else 'eval'
split_name = {
'train': 'train',
'dev': 'eval',
'test': test_name}[split_name]
return getattr(self.conf, f'max_{split_name}_inst')
def load_raw_data(self):
"""Load raw data for all splits from disk."""
return {
split_name:
self.load_raw_split(split_name)[:self.get_max_inst(split_name)]
for split_name in self.split_names}
@cached_property
def train_sampler(self):
"""The sampler for the training split. Returns a DistributedSampler
if we're doing distributed training, defaults to PyTorch's default
sampler otherwise.
"""
if self.conf.distributed:
return torch.utils.data.distributed.DistributedSampler(
self.train,
num_replicas=torch.distributed.get_world_size(),
rank=self.conf.local_rank,
shuffle=True)
return None
def has_train_data(self):
"""Use to check if this is a test-only dataset.
"""
return len(next(iter(self.tensors['train'].values()))) > 0
@cached_property
def tokenizer(self):
from transformers import AutoTokenizer
return AutoTokenizer.from_pretrained(self.conf.tokenizer)
@torch_cached_property(
fname_tpl='tensors.{conf_str}.pth', map_location='cpu')
def tensors(self):
"""The data in tensorized form, ready to be fed directly to the model.
This method is backed by a file cache, that is, the raw data will be
tensorized one (possibly very slow) and cached. Subsequent runs will
then load tensors from this cache, which is usually much faster.
"""
return self.tensorize()
def tensorize(self):
"""Turn raw data from each split into PyTorch tensors.
"""
return {
split_name: self.tensorize_split(split)
for split_name, split in self.raw.items()}
def log_size(self):
"""Log size of each split.
"""
for split_name in self.split_names:
split = getattr(self, split_name)
if split is not None:
msg = f'{len(split)} {split_name} instances'
loader_name = split_name + '_loader'
loader = getattr(self, loader_name)
if loader is not None:
msg += f' | {len(loader)} batches'
self.log(msg)
class EvalOnTrain():
"""A Dataset without test set.
"""
@property
def split_names(self):
return ['train', 'dev']
@cached_property
def train(self):
return TensorDictDataset(self.tensors['train'])
@property
def dev(self):
return TensorDictDataset(self.tensors['dev'])
@property
def train_loader(self):
return DataLoader(
self.train,
batch_size=self.conf.batch_size,
pin_memory=False,
num_workers=0,
sampler=self.train_sampler)
@property
def dev_loader(self):
return self.eval_loader(self.dev)
def eval_loader(self, tensorized_data):
return DataLoader(
tensorized_data,
self.conf.eval_batch_size,
shuffle=False,
)
class TrainDevTest():
"""A datase with typical train/dev/test splits.
"""
@property
def split_names(self):
return ['train', 'dev', 'test']
@property
def train(self):
return TensorDictDataset(self.tensors['train'])
@property
def dev(self):
return TensorDictDataset(self.tensors['dev'])
@property
def test(self):
return TensorDictDataset(self.tensors['test'])
@property
def train_loader(self):
return DataLoader(
self.train,
self.conf.batch_size,
sampler=self.train_sampler)
@property
def dev_loader(self):
return self.eval_loader(self.dev)
@property
def test_loader(self):
return self.eval_loader(self.test)
def eval_loader(self, tensorized_data):
return DataLoader(
tensorized_data,
self.conf.eval_batch_size,
shuffle=False)
class RelationStatements(Data):
"""A dataset contain relation statements such as "Barack Obama
was born in Hawaii". The object entity ("Hawaii") will be masked
to create training instances like "Barack Obama was born in [MASK]".
"""
is_pretokenized = False
def tensorize_split(self, raw_data):
"""Tokenize and encode relation statements, also add tensors
contiaining entity ids and their positions in the statements.
"""
instances = self.raw_data_to_instances(raw_data)
if not instances['contexts']:
return {'dummy': []}
tensors = self.tokenizer.batch_encode_plus(
instances['contexts'],
max_length=self.conf.max_seq_len,
padding='max_length',
return_tensors='pt',
is_split_into_words=self.is_pretokenized)
# the object returned by batch_encode_plus cannot be serialized
# with torch.save, so we convert it into a dict, which can
tensors = dict(tensors)
tensors['entity_ids'] = instances['entity_ids']
entity_mask = tensors['input_ids'] == self.tokenizer.mask_token_id
if entity_mask.sum() != len(entity_mask):
# in a few very rare cases, the first entity's name is so long that
# the second entitity is truncated. Arbitrarily turn the last token
# into an entity mention in these cases to ensure that each
# trunacated statement also has exactly one object entity.
missing = entity_mask.sum(1) != 1
entity_mask[missing, -2] = 1
tensors['entity_mask'] = entity_mask
return tensors
def untensorize(self, tensors, pred):
"""The reverse of tensorize: Decode tensors into text.
"""
attn_masks = tensors['attention_mask'].bool()
for input_ids, attn_mask, entity_target_id, entity_pred_id in zip(
tensors['input_ids'], attn_masks, tensors['entity_ids'], pred):
context = self.tokenizer.decode(input_ids.masked_select(attn_mask))
context = context.replace(
self.tokenizer.mask_token, ' ' + self.tokenizer.mask_token)
entity_target = self.entities.labels_en[entity_target_id]
entity_pred = self.entities.labels_en[entity_pred_id]
yield {
'context': context,
'entity_target': entity_target,
'entity_pred': entity_pred}
def to_context_target_pred(self, batch, output):
"""Convenience function similar to untensorize.
"""
pred = output['entity_pred'][:, 0]
instances = self.untensorize(batch, pred)
for i in instances:
yield i['context'], i['entity_target'], i['entity_pred']
def raw_data_to_instances(self, raw_data):
raise NotImplementedError()
def mask_entity(self, sent_tokens, entity_start_idx, entity_end_idx):
return (
sent_tokens[:entity_start_idx] +
[self.tokenizer.mask_token] +
sent_tokens[entity_end_idx:])
class WikidataRelationStatements_Base(RelationStatements):
"""Baseclass for statements derived from Wikidata relations. The only
puprose of this class is to encapsulate two methods for reading Wikidata
files.
"""
target_arg = 'o' # use relation objects as targets
def load_raw_split(self, split_name):
fname = self.filename_for_split(split_name)
split_file = self.conf.datadir / self.dir_name / fname
if split_name in {'dev', 'test'}:
if split_name == 'test' and self.conf.max_test_inst:
n_inst = self.conf.max_test_inst
else:
n_inst = self.conf.max_eval_inst
else:
n_inst = self.conf.n_facts
instances_raw = list(jsonlines_load(split_file, max=n_inst))
assert n_inst == len(instances_raw), len(instances_raw)
return instances_raw
def raw_data_to_instances(self, raw_data):
start_idx_key = self.target_arg + '_start'
end_idx_key = self.target_arg + '_end'
target_id_key = self.target_arg + 'id'
contexts = [
' '.join(self.mask_entity(
inst['sent'],
entity_start_idx=inst[start_idx_key],
entity_end_idx=inst[end_idx_key]))
for inst in raw_data]
target_ids_raw = [inst[target_id_key] for inst in raw_data]
target_ids = self.entities.id_enc.transform(target_ids_raw)
return {
'contexts': contexts,
'entity_ids': target_ids}
class WikidataRelationStatements(WikidataRelationStatements_Base, EvalOnTrain):
"""Statements derived from Wikidata relations, involving the top n entities
in Wikidata, where n = 1 million or n = 6 million.
"""
@property
def dir_name(self):
return 'wikidata_relation_statements'
def filename_for_split(self, split_name):
return f'relation_statements.en.top{self.conf.top_n}.jl'
class WikidataParaphrases(WikidataRelationStatements_Base, TrainDevTest):
"""Statements derived from Wikidata relations, with predefined splits for
paraphrases and finetuning.
"""
@property
def dir_name(self):
return 'wikidata_paraphrases'
def filename_for_split(self, split_name):
i = self.conf.paraphrase_id
if self.conf.paraphrase_mode == 'train':
split_name = ''
else:
assert self.conf.paraphrase_mode == 'finetune'
if split_name in {'train', 'dev'}:
split_name = 'finetune.'
else:
assert split_name == 'test'
split_name = 'eval.'
rel = self.relation_name
return f'wd-paraphrase.{rel}.{i}{split_name}.top{self.conf.top_n}.jl'
class WikidataParaphrases_born_in(WikidataParaphrases):
relation_name = 'born_in'