Skip to content

Commit

Permalink
Provision of updated ordered merge functionality with full merge func…
Browse files Browse the repository at this point in the history
…tionality (#188)

* Initial commit for left merge mapping with correct behaviour

* All fast ordered merges now fast and correct

* ordered fast merges. indexed string field streaming map

* Fixed streaming merge for non-indexed fields

* Work towards fix for pathological (large) index gaps resulting in an entire index fetch

* Fix for streaming merges to subdivide the mapping chunk if the index range is too large

* Further work on ordered merges

* Fix for ordered streaming merges of fixed strings

* Temporary commit of benchmarking scripts; this isn't their home

* Fix for failing dataframe tests; caused by an indent issue in the mapping code

* Added test for ops.next_map_subchunk and some comments on the function for maintainers
  • Loading branch information
atbenmurray authored Jun 10, 2021
1 parent ed877c5 commit ae92652
Show file tree
Hide file tree
Showing 10 changed files with 2,393 additions and 136 deletions.
212 changes: 203 additions & 9 deletions exetera/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,12 @@ def merge(left: DataFrame,
right_fields: Optional[Sequence[str]] = None,
left_suffix: str = '_l',
right_suffix: str = '_r',
how='left'):
how='left',
hint_left_keys_ordered: Optional[bool] = None,
hint_left_keys_unique: Optional[bool] = None,
hint_right_keys_ordered: Optional[bool] = None,
hint_right_keys_unique: Optional[bool] = None,
chunk_size=1 << 20):
"""
Merge 'left' and 'right' DataFrames into a destination dataset. The merge is a database-style
join operation, in any of the following modes ("left", "right", "inner", "outer"). This
Expand Down Expand Up @@ -550,11 +555,76 @@ def merge(left: DataFrame,
left_len = list(left_lens)[0]
right_len = list(right_lens)[0]

# TODO: tweak this to be consistent with the streaming code
if left_len < (2 << 30) and right_len < (2 << 30):
index_dtype = np.int32
else:
index_dtype = np.int64

left_fields_to_map = left.keys() if left_fields is None else left_fields
right_fields_to_map = right.keys() if right_fields is None else right_fields

# TODO: check for ordering for multi-key-fields (is_ordered doesn't support it yet)
if hint_left_keys_ordered is None:
left_keys_ordered = False
else:
left_keys_ordered = hint_left_keys_ordered

if hint_right_keys_ordered is None:
right_keys_ordered = False
else:
right_keys_ordered = hint_right_keys_ordered

if hint_left_keys_unique is None:
left_keys_unique = False
else:
left_keys_unique = hint_left_keys_unique

if hint_right_keys_unique is None:
right_keys_unique = False
else:
right_keys_unique = hint_right_keys_unique

ordered = False
if left_keys_ordered and right_keys_ordered and \
len(left_on_fields) == 1 and len(right_on_fields) == 1 and \
how in ('left', 'right', 'inner'):
ordered = True

if ordered:
_ordered_merge(left, right, dest,
left_on_fields, right_on_fields,
left_fields_to_map, right_fields_to_map,
left_len, right_len,
index_dtype,
left_suffix, right_suffix,
how,
left_keys_unique,
right_keys_unique,
chunk_size)
else:
_unordered_merge(left, right, dest,
left_on_fields, right_on_fields,
left_fields_to_map, right_fields_to_map,
left_len, right_len,
index_dtype,
left_suffix, right_suffix,
how)


def _unordered_merge(left: DataFrame,
right: DataFrame,
dest: DataFrame,
left_on_fields,
right_on_fields,
left_fields_to_map,
right_fields_to_map,
left_len,
right_len,
index_dtype,
left_suffix,
right_suffix,
how):
left_df_dict = {}
right_df_dict = {}
left_on_keys = []
Expand Down Expand Up @@ -582,6 +652,7 @@ def merge(left: DataFrame,
l_df = pd.DataFrame(left_df_dict)
r_df = pd.DataFrame(right_df_dict)

# TODO: more efficient unordered merges using dict and numba
df = pd.merge(left=l_df, right=r_df, left_on=l_key, right_on=r_key, how=how)

l_to_d_map = df['l_i'].to_numpy(dtype=np.int32)
Expand All @@ -590,12 +661,10 @@ def merge(left: DataFrame,
r_to_d_filt = np.logical_not(df['r_i'].isnull()).to_numpy()

# perform the mapping
left_fields_ = left.keys() if left_fields is None else left_fields
right_fields_ = right.keys() if right_fields is None else right_fields

for f in left_fields_:
for f in left_fields_to_map:
dest_f = f
if f in right_fields_:
if f in right_fields_to_map:
dest_f += left_suffix
l = left[f]
d = l.create_like(dest, dest_f)
Expand All @@ -606,13 +675,14 @@ def merge(left: DataFrame,
else:
v = ops.safe_map_values(l.data[:], l_to_d_map, l_to_d_filt)
d.data.write(v)
if np.all(l_to_d_filt) == False:

if not np.all(l_to_d_filt):
d = dest.create_numeric('valid'+left_suffix, 'bool')
d.data.write(l_to_d_filt)

for f in right_fields_:
for f in right_fields_to_map:
dest_f = f
if f in left_fields_:
if f in left_fields_to_map:
dest_f += right_suffix
r = right[f]
d = r.create_like(dest, dest_f)
Expand All @@ -623,6 +693,130 @@ def merge(left: DataFrame,
else:
v = ops.safe_map_values(r.data[:], r_to_d_map, r_to_d_filt)
d.data.write(v)
if np.all(r_to_d_filt) == False:

if not np.all(r_to_d_filt):
d = dest.create_numeric('valid'+right_suffix, 'bool')
d.data.write(r_to_d_filt)


def _ordered_merge(left: DataFrame,
right: DataFrame,
dest: DataFrame,
left_on_fields,
right_on_fields,
left_fields_to_map,
right_fields_to_map,
left_len,
right_len,
index_dtype,
left_suffix,
right_suffix,
how,
left_keys_unique,
right_keys_unique,
chunk_size=1 << 20):
supported = ('left', 'right', 'inner')
if how not in supported:
raise ValueError("Unsupported mode for 'how'; must be one of "
"{} but is {}".format(supported, how))

if left_keys_unique or right_keys_unique:
npdtype = ops.get_map_datatype_based_on_lengths(left_len, right_len)
strdtype = 'int32' if npdtype == np.int32 else np.int64
invalid = ops.INVALID_INDEX_32 if npdtype == np.int32 else ops.INVALID_INDEX_64
else:
npdtype = np.int64
strdtype = 'int64'
invalid = ops.INVALID_INDEX_64

# chunksize = 1 << 25
if how in ('left', 'right'):
if how == 'left':
a_on, b_on = left_on_fields, right_on_fields
a_unique, b_unique = left_keys_unique, right_keys_unique
else:
a_on, b_on = right_on_fields, left_on_fields
a_unique, b_unique = right_keys_unique, left_keys_unique

if a_unique:
if b_unique:
b_result = dest.create_numeric('_b_map', strdtype)
ops.generate_ordered_map_to_left_both_unique_streamed(
a_on[0], b_on[0], b_result, invalid, rdtype=npdtype)
else:
a_result = dest.create_numeric('_a_map', strdtype)
b_result = dest.create_numeric('_b_map', strdtype)
ops.generate_ordered_map_to_left_left_unique_streamed(
a_on[0], b_on[0], a_result, b_result, invalid, rdtype=npdtype)
else:
if right_keys_unique:
b_result = dest.create_numeric('_b_map', strdtype)
ops.generate_ordered_map_to_left_right_unique_streamed(
a_on[0], b_on[0], b_result, invalid, rdtype=npdtype)
else:
a_result = dest.create_numeric('_a_map', strdtype)
b_result = dest.create_numeric('_b_map', strdtype)
ops.generate_ordered_map_to_left_streamed(
a_on[0], b_on[0], a_result, b_result, invalid, rdtype=npdtype)

if how == 'right':
dest.rename('_a_map', '_right_map')
dest.rename('_b_map', '_left_map')
else:
dest.rename('_a_map', '_left_map')
dest.rename('_b_map', '_right_map')
else:
left_result = dest.create_numeric('_left_map', strdtype)
right_result = dest.create_numeric('_right_map', strdtype)
if left_keys_unique:
if right_keys_unique:
ops.generate_ordered_map_to_inner_both_unique_streamed(
left_on_fields[0], right_on_fields[0], left_result, right_result,
rdtype=npdtype)
else:
ops.generate_ordered_map_to_inner_right_unique_streamed(
left_on_fields[0], right_on_fields[0], left_result, right_result,
rdtype=npdtype)
else:
if right_keys_unique:
ops.generate_ordered_map_to_inner_left_unique_streamed(
left_on_fields[0], right_on_fields[0], left_result, right_result,
rdtype=npdtype)
else:
ops.generate_ordered_map_to_inner_streamed(
left_on_fields[0], right_on_fields[0], left_result, right_result,
rdtype=npdtype)

# perform the mappings
# ====================

left_map = dest['_left_map'] if '_left_map' in dest else None
right_map = dest['_right_map']

if left_map is None:
for k in left_fields_to_map:
dest_k = k
if k in dest:
dest_k += left_suffix
dest_f = left[k].create_like(dest, dest_k)
ops.chunked_copy(left[k], dest_f, chunk_size)
else:
for k in left_fields_to_map:
dest_k = k
if k in dest:
dest_k += left_suffix
dest_f = left[k].create_like(dest, dest_k)
if left[k].indexed:
ops.ordered_map_valid_indexed_stream(left[k], left_map, dest_f)
else:
ops.ordered_map_valid_stream(left[k], left_map, dest_f)

for k in right_fields_to_map:
dest_k = k
if k in dest:
dest_k += right_suffix
dest_f = right[k].create_like(dest, dest_k)
if right[k].indexed:
ops.ordered_map_valid_indexed_stream(right[k], right_map, dest_f, invalid)
else:
ops.ordered_map_valid_stream(right[k], right_map, dest_f, invalid)
6 changes: 5 additions & 1 deletion exetera/core/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ def dtype(self):

def __getitem__(self, item):
if self._dataset is None:
raise ValueError("Cannot get data from an empty Field")
# raise ValueError("Cannot get data from an empty Field")
return np.zeros(0, dtype=np.uint8)
return self._dataset[item]

def __setitem__(self, key, value):
Expand Down Expand Up @@ -379,6 +380,8 @@ def __getitem__(self, item):
# TODO: validate slice

index = self._indices[start:stop+1]
if len(index) == 0:
return []
bytestr = self._values[index[0]:index[-1]]
results = [None] * (len(index) - 1)
startindex = start
Expand Down Expand Up @@ -549,6 +552,7 @@ def apply_spans_min(self, spans_to_apply, target=None, in_place=False):
def apply_spans_max(self, spans_to_apply, target=None, in_place=False):
return FieldDataOps.apply_spans_max(self, spans_to_apply, target, in_place)


class FixedStringMemField(MemoryField):
def __init__(self, session, length):
super().__init__(session)
Expand Down
Loading

0 comments on commit ae92652

Please sign in to comment.