-
Notifications
You must be signed in to change notification settings - Fork 0
/
splittimestamp.py
160 lines (122 loc) · 4.92 KB
/
splittimestamp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import datetime
import os
import time
from typing import Any, Callable, List, Optional, Tuple
import pyarrow as pa
import pyarrow.compute
from cjwmodule.arrow.types import ArrowRenderResult
def _extract_struct_times_from_array(
array: pa.Array,
) -> List[Optional[time.struct_time]]:
unix_timestamps = pa.compute.divide(array.view(pa.int64()), 1_000_000_000)
unix_timestamp_list = unix_timestamps.to_pylist()
return [None if ts is None else time.localtime(ts) for ts in unix_timestamp_list]
def _extract_struct_times_from_chunked_array(
input_chunked_array: pa.ChunkedArray,
) -> List[List[Optional[time.struct_time]]]:
return [
_extract_struct_times_from_array(chunk) for chunk in input_chunked_array.chunks
]
def _build_date_builder(name: str, part: str):
if part == "date":
unit = "day"
def fn(st: time.struct_time) -> datetime.date:
return datetime.date(st.tm_year, st.tm_mon, st.tm_mday)
elif part == "dateweek":
unit = "week"
def fn(st: time.struct_time) -> datetime.date:
return datetime.date.fromordinal(
datetime.date(st.tm_year, st.tm_mon, st.tm_mday).toordinal()
- st.tm_wday
)
elif part == "datemonth":
unit = "month"
def fn(st: time.struct_time) -> datetime.date:
return datetime.date(st.tm_year, st.tm_mon, 1)
elif part == "datequarter":
unit = "quarter"
def fn(st: time.struct_time) -> datetime.date:
return datetime.date(
st.tm_year, [0, 1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10, 10][st.tm_mon], 1
)
else:
unit = "year"
def fn(st: time.struct_time) -> datetime.date:
return datetime.date(st.tm_year, 1, 1)
return pa.field(name, pa.date32(), metadata={"unit": unit}), fn
def _build_string_builder(name: str, part: str):
if part == "time_minutes":
fmt = "{3:02d}:{4:02d}".format
else:
fmt = "{3:02d}:{4:02d}:{5:02d}".format
def fn(st: time.struct_time) -> str:
return fmt(*st)
return pa.field(name, pa.utf8()), fn
def _build_number_builder(name: str, part: str):
pa_type, struct_time_index = {
"year": (pa.int16(), 0),
"month": (pa.int8(), 1),
"day": (pa.int8(), 2),
"hour": (pa.int8(), 3),
"minute": (pa.int8(), 4),
"second": (pa.int8(), 5),
}[part]
def fn(st: time.struct_time) -> int:
return st[struct_time_index]
return pa.field(name, pa_type, metadata={"format": "{:d}"}), fn
def _build_output_array(
field: pa.Field,
struct_times: List[Optional[time.struct_time]],
fn: Callable[[time.struct_time], Any],
) -> pa.Array:
values = [None if st is None else fn(st) for st in struct_times]
return pa.array(values, field.type)
def _build_output_column(
name: str, struct_times: List[List[Optional[time.struct_time]]], part: str
) -> Tuple[pa.Field, pa.ChunkedArray]:
if part in {"date", "dateweek", "datemonth", "datequarter", "dateyear"}:
field, fn = _build_date_builder(name, part)
elif part in {"time_minutes", "time_seconds"}:
field, fn = _build_string_builder(name, part)
else:
field, fn = _build_number_builder(name, part)
chunked_array = pa.chunked_array(
[_build_output_array(field, l, fn) for l in struct_times], field.type
)
return field, chunked_array
def render_arrow_v1(table, params, **kwargs):
if not params["colname"] or not any(o["outcolname"] for o in params["outputs"]):
return ArrowRenderResult(table)
os.environ["TZ"] = params["timezone"]
time.tzset()
# Prepare times as Python `time.struct_time`
struct_times = _extract_struct_times_from_chunked_array(table[params["colname"]])
# unique-ize output columns. Python dict iterates in insertion order
unique_out_columns = {o["outcolname"]: o["part"] for o in params["outputs"]}
output_colnames = frozenset(unique_out_columns.keys())
# the output table will be:
# * input table...
# * ... minus params["colname"]...
# * ... minus all the columns with names we're going to replace
# * ... plus all the columns we're creating
input_index = table.schema.get_field_index(params["colname"])
insert_index = input_index - sum(
1
for colname in output_colnames
if 0 <= table.schema.get_field_index(colname) < input_index
)
# Drop all the columns we'll be replacing
table = table.drop(
[
params["colname"],
*(
colname
for colname in table.column_names
if colname in output_colnames and colname != params["colname"]
),
]
)
for i, (colname, part) in enumerate(unique_out_columns.items()):
field, data = _build_output_column(colname, struct_times, part)
table = table.add_column(insert_index + i, field, data)
return ArrowRenderResult(table)