forked from huggingface/datatrove
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonl.py
81 lines (74 loc) · 3.16 KB
/
jsonl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json
from json import JSONDecodeError
from typing import Callable, Literal
from datatrove.io import DataFolderLike
from datatrove.pipeline.readers.base import BaseDiskReader
from datatrove.utils.logging import logger
class JsonlReader(BaseDiskReader):
"""Read data from JSONL files.
Will read each line as a separate document.
Args:
data_folder: the data folder to read from
compression: the compression to use (default: "infer")
limit: limit the number of JSON lines to read
skip: skip the first n rows
file_progress: show progress bar for files
doc_progress: show progress bar for documents
adapter: function to adapt the data dict from the source to a Document.
Take as input: data: dict, path: str, id_in_file: int | str
Return: a dict with at least a "text" key
text_key: key to use for the text in the default adapter (default: "text"). Ignored if you provide your own `adapter`
id_key: key to use for the id in the default adapter (default: "id"). Ignored if you provide your own `adapter`
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""
name = "🐿 Jsonl"
def __init__(
self,
data_folder: DataFolderLike,
compression: Literal["infer", "gzip", "zstd"] | None = "infer",
limit: int = -1,
skip: int = 0,
file_progress: bool = False,
doc_progress: bool = False,
adapter: Callable = None,
text_key: str = "text",
id_key: str = "id",
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
super().__init__(
data_folder,
limit,
skip,
file_progress,
doc_progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.compression = compression
def read_file(self, filepath: str):
with self.data_folder.open(filepath, "rt", encoding='utf-8', compression=self.compression) as f:
try:
for li, line in enumerate(f):
with self.track_time():
try:
document = self.get_document_from_dict(json.loads(line), filepath, li)
if not document:
continue
except (EOFError, JSONDecodeError) as e:
logger.warning(f"Error when reading `{filepath}`: {e}")
continue
yield document
except UnicodeDecodeError as e:
logger.warning(f"File `{filepath}` may be corrupted: raised UnicodeDecodeError ({e})")