Skip to content

Commit

Permalink
Update pandas to polars for feature processing
Browse files Browse the repository at this point in the history
  • Loading branch information
xpai committed Apr 16, 2024
1 parent 4646e25 commit 12c49f4
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 79 deletions.
8 changes: 5 additions & 3 deletions fuxictr/preprocess/build_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import numpy as np
import gc
import multiprocessing as mp
import polars as pl


def split_train_test(train_ddf=None, valid_ddf=None, test_ddf=None, valid_size=0,
Expand Down Expand Up @@ -64,8 +65,9 @@ def transform(feature_encoder, ddf, filename, preprocess=False, block_size=0):
if block_size > 0:
pool = mp.Pool(mp.cpu_count() // 2)
block_id = 0
for idx in range(0, len(ddf), block_size):
df_block = ddf[idx: (idx + block_size)]
num_rows = ddf.select(pl.count()).collect().item()
for idx in range(0, num_rows, block_size):
df_block = ddf.slice(idx, block_size)
pool.apply_async(transform_block, args=(feature_encoder,
df_block,
'{}/part_{:05d}.npz'.format(filename, block_id),
Expand Down Expand Up @@ -96,7 +98,7 @@ def build_dataset(feature_encoder, train_data=None, valid_data=None, test_data=N
valid_ddf = feature_encoder.read_csv(valid_data, **kwargs)
test_ddf = feature_encoder.read_csv(test_data, **kwargs)
train_ddf, valid_ddf, test_ddf = split_train_test(train_ddf, valid_ddf, test_ddf,
valid_size, test_size, split_type)
valid_size, test_size, split_type)

# fit and transform train_ddf
train_ddf = feature_encoder.preprocess(train_ddf)
Expand Down
93 changes: 42 additions & 51 deletions fuxictr/preprocess/feature_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import numpy as np
from collections import Counter, OrderedDict
import pandas as pd
import polars as pl
import pickle
import os
import logging
import json
import re
import shutil
import glob
from pathlib import Path
import sklearn.preprocessing as sklearn_preprocess
from fuxictr.features import FeatureMap
Expand Down Expand Up @@ -65,11 +67,13 @@ def _complete_feature_cols(self, feature_cols):
full_feature_cols.append(col)
return full_feature_cols

def read_csv(self, data_path, sep=",", nrows=None, **kwargs):
def read_csv(self, data_path, sep=",", n_rows=None, **kwargs):
logging.info("Reading file: " + data_path)
usecols_fn = lambda x: x in self.dtype_dict
ddf = pd.read_csv(data_path, sep=sep, usecols=usecols_fn,
dtype=object, memory_map=True, nrows=nrows)
file_names = sorted(glob.glob(data_path))
# Require python >= 3.8 for use polars to scan multiple csv files
file_names = file_names[0]
ddf = pl.scan_csv(source=file_names, separator=sep, dtypes=self.dtype_dict,
low_memory=False, n_rows=n_rows)
return ddf

def preprocess(self, ddf):
Expand All @@ -78,50 +82,38 @@ def preprocess(self, ddf):
for col in all_cols:
name = col["name"]
if name in ddf.columns:
if ddf[name].isnull().values.any():
ddf[name] = self._fill_na_(col, ddf[name])
ddf[name] = ddf[name].astype(self.dtype_dict[name])
fill_na = "" if col["dtype"] in ["str", str] else 0
fill_na = col.get("fill_na", fill_na)
ddf = ddf.with_columns(pl.col(name).fill_null(fill_na))
if col.get("preprocess"):
preprocess_splits = re.split(r"\(|\)", col["preprocess"])
preprocess_fn = getattr(self, preprocess_splits[0])
if len(preprocess_splits) > 1:
ddf[name] = preprocess_fn(ddf, preprocess_splits[1])
else:
ddf[name] = preprocess_fn(ddf, name)
ddf[name] = ddf[name].astype(self.dtype_dict[name])
preprocess_args = re.split(r"\(|\)", col["preprocess"])
preprocess_fn = getattr(self, preprocess_args[0])
ddf = preprocess_fn(ddf, name, *preprocess_args[1:-1])
ddf = ddf.with_columns(pl.col(name).cast(self.dtype_dict[name]))
active_cols = [col["name"] for col in all_cols if col.get("active") != False]
ddf = ddf.loc[:, active_cols]
ddf = ddf.select(active_cols)
return ddf

def _fill_na_(self, col, series):
na_value = col.get("fill_na")
if na_value is not None:
return series.fillna(na_value)
elif col["dtype"] in ["str", str]:
return series.fillna("")
else:
raise RuntimeError("Feature column={} requires to assign fill_na value!".format(col["name"]))

def fit(self, train_ddf, min_categr_count=1, num_buckets=10, **kwargs):
logging.info("Fit feature processor...")
for col in self.feature_cols:
name = col["name"]
if col["active"]:
logging.info("Processing column: {}".format(col))
col_series = train_ddf.select(name).collect().to_series()
if col["type"] == "meta": # e.g. group_id
self.fit_meta_col(col, train_ddf[name].values)
self.fit_meta_col(col)
elif col["type"] == "numeric":
self.fit_numeric_col(col, train_ddf[name].values)
self.fit_numeric_col(col, col_series)
elif col["type"] == "categorical":

self.fit_categorical_col(col, train_ddf[name].values,
self.fit_categorical_col(col, col_series,
min_categr_count=min_categr_count,
num_buckets=num_buckets)
elif col["type"] == "sequence":
self.fit_sequence_col(col, train_ddf[name].values,
self.fit_sequence_col(col, col_series,
min_categr_count=min_categr_count)
else:
raise NotImplementedError("feature_col={}".format(feature_col))
raise NotImplementedError("feature type={}".format(col["type"]))

# Expand vocab from pretrained_emb
os.makedirs(self.data_dir, exist_ok=True)
Expand Down Expand Up @@ -166,17 +158,16 @@ def fit(self, train_ddf, min_categr_count=1, num_buckets=10, **kwargs):
self.feature_map.save(self.json_file)
logging.info("Set feature processor done.")

def fit_meta_col(self, col, col_values):
def fit_meta_col(self, col):
name = col["name"]
feature_type = col["type"]
self.feature_map.features[name] = {"type": feature_type}
# assert col.get("remap") == False, "Meta feature currently only supports `remap=False`, \
# since it needs to map train and validation sets together."
if col.get("remap", True):
# No need to fit, update vocab in encode_meta()
tokenizer = Tokenizer(min_freq=1, remap=True)
self.processor_dict[name + "::tokenizer"] = tokenizer

def fit_numeric_col(self, col, col_values):
def fit_numeric_col(self, col, col_series):
name = col["name"]
feature_type = col["type"]
feature_source = col.get("source", "")
Expand All @@ -186,10 +177,10 @@ def fit_numeric_col(self, col, col_values):
self.feature_map.features[name]["feature_encoder"] = col["feature_encoder"]
if "normalizer" in col:
normalizer = Normalizer(col["normalizer"])
normalizer.fit(col_values)
normalizer.fit(col_series.drop_nans().to_numpy())
self.processor_dict[name + "::normalizer"] = normalizer

def fit_categorical_col(self, col, col_values, min_categr_count=1, num_buckets=10):
def fit_categorical_col(self, col, col_series, min_categr_count=1, num_buckets=10):
name = col["name"]
feature_type = col["type"]
feature_source = col.get("source", "")
Expand All @@ -206,7 +197,7 @@ def fit_categorical_col(self, col, col_values, min_categr_count=1, num_buckets=1
tokenizer = Tokenizer(min_freq=min_categr_count,
na_value=col.get("fill_na", ""),
remap=col.get("remap", True))
tokenizer.fit_on_texts(col_values)
tokenizer.fit_on_texts(col_series)
if "share_embedding" in col:
self.feature_map.features[name]["share_embedding"] = col["share_embedding"]
tknzr_name = col["share_embedding"] + "::tokenizer"
Expand All @@ -225,20 +216,20 @@ def fit_categorical_col(self, col, col_values, min_categr_count=1, num_buckets=1
if category_processor == "quantile_bucket": # transform numeric value to bucket
num_buckets = col.get("num_buckets", num_buckets)
qtf = sklearn_preprocess.QuantileTransformer(n_quantiles=num_buckets + 1)
qtf.fit(col_values)
qtf.fit(col_series.to_numpy())
boundaries = qtf.quantiles_[1:-1]
self.feature_map.features[name]["vocab_size"] = num_buckets
self.processor_dict[name + "::boundaries"] = boundaries
elif category_processor == "hash_bucket":
num_buckets = col.get("num_buckets", num_buckets)
uniques = Counter(col_values)
uniques = Counter(col_series.to_numpy())
num_buckets = min(num_buckets, len(uniques))
self.feature_map.features[name]["vocab_size"] = num_buckets
self.processor_dict[name + "::num_buckets"] = num_buckets
else:
raise NotImplementedError("category_processor={} not supported.".format(category_processor))

def fit_sequence_col(self, col, col_values, min_categr_count=1):
def fit_sequence_col(self, col, col_series, min_categr_count=1):
name = col["name"]
feature_type = col["type"]
feature_source = col.get("source", "")
Expand All @@ -259,7 +250,7 @@ def fit_sequence_col(self, col, col_values, min_categr_count=1):
tokenizer = Tokenizer(min_freq=min_categr_count, splitter=splitter,
na_value=na_value, max_len=max_len, padding=padding,
remap=col.get("remap", True))
tokenizer.fit_on_texts(col_values)
tokenizer.fit_on_texts(col_series)
if "share_embedding" in col:
self.feature_map.features[name]["share_embedding"] = col["share_embedding"]
tknzr_name = col["share_embedding"] + "::tokenizer"
Expand All @@ -280,34 +271,34 @@ def transform(self, ddf):
for feature, feature_spec in self.feature_map.features.items():
if feature in ddf.columns:
feature_type = feature_spec["type"]
col_values = ddf.loc[:, feature].values
col_series = ddf.select(feature).collect().to_series()
if feature_type == "meta":
if feature + "::tokenizer" in self.processor_dict:
tokenizer = self.processor_dict[feature + "::tokenizer"]
data_dict[feature] = tokenizer.encode_meta(col_values)
data_dict[feature] = tokenizer.encode_meta(col_series)
# Update vocab in tokenizer
self.processor_dict[feature + "::tokenizer"] = tokenizer
else:
data_dict[feature] = col_values.astype(self.dtype_dict[feature])
data_dict[feature] = col_series.to_numpy()
elif feature_type == "numeric":
col_values = col_values.astype(float)
col_values = col_series.cast(pl.Float32).to_numpy()
normalizer = self.processor_dict.get(feature + "::normalizer")
if normalizer:
col_values = normalizer.transform(col_values)
data_dict[feature] = col_values
elif feature_type == "categorical":
category_processor = feature_spec.get("category_processor")
if category_processor is None:
data_dict[feature] = self.processor_dict.get(feature + "::tokenizer").encode_category(col_values)
data_dict[feature] = self.processor_dict.get(feature + "::tokenizer").encode_category(col_series)
elif category_processor == "numeric_bucket":
raise NotImplementedError
elif category_processor == "hash_bucket":
raise NotImplementedError
elif feature_type == "sequence":
data_dict[feature] = self.processor_dict.get(feature + "::tokenizer").encode_sequence(col_values)
data_dict[feature] = self.processor_dict.get(feature + "::tokenizer").encode_sequence(col_series)
for label in self.feature_map.labels:
if label in ddf.columns:
data_dict[label] = ddf.loc[:, label].values
data_dict[label] = ddf.select(label).collect().to_series().to_numpy()
return data_dict

def load_pickle(self, pickle_file=None):
Expand Down Expand Up @@ -335,6 +326,6 @@ def save_vocab(self, vocab_file):
with open(vocab_file, "w") as fd:
fd.write(json.dumps(vocab, indent=4))

def copy_from(self, ddf, src_name):
return ddf[src_name]
def copy_from(self, ddf, name, src_name):
ddf = ddf.with_columns(pl.col(src_name).alias(name))
return ddf
3 changes: 1 addition & 2 deletions fuxictr/preprocess/normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def __init__(self, normalizer):

def fit(self, X):
if not self.callable:
null_index = np.isnan(X)
self.normalizer.fit(X[~null_index].reshape(-1, 1))
self.normalizer.fit(X.reshape(-1, 1))

def transform(self, X):
if self.callable:
Expand Down
41 changes: 20 additions & 21 deletions fuxictr/preprocess/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def __init__(self, max_features=None, na_value="", min_freq=1, splitter=None, re
self.num_workers = num_workers
self.remap = remap

def fit_on_texts(self, texts):
def fit_on_texts(self, series):
word_counts = Counter()
if self._splitter is not None: # for sequence
max_len = 0
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
chunks = np.array_split(texts, self.num_workers)
chunks = np.array_split(series.to_numpy(), self.num_workers)
tasks = [executor.submit(count_tokens, chunk, self._splitter) for chunk in chunks]
for future in tqdm(as_completed(tasks), total=len(tasks)):
block_word_counts, block_max_len = future.result()
Expand All @@ -51,7 +51,7 @@ def fit_on_texts(self, texts):
if self.max_len == 0: # if argument max_len not given
self.max_len = max_len
else:
word_counts = Counter(list(texts))
word_counts = dict(series.value_counts().iter_rows())
self.build_vocab(word_counts)

def build_vocab(self, word_counts):
Expand Down Expand Up @@ -101,30 +101,29 @@ def update_vocab(self, word_list):
if new_words > 0:
self.vocab["__OOV__"] = self.vocab_size()

def encode_meta(self, values):
word_counts = Counter(list(values))
def encode_meta(self, series):
word_counts = dict(series.value_counts().iter_rows())
if len(self.vocab) == 0:
self.build_vocab(word_counts)
else: # for considering meta data in test data
self.update_vocab(word_counts.keys())
meta_values = [self.vocab.get(x, self.vocab["__OOV__"]) for x in values]
return np.array(meta_values)
series = series.apply(lambda x: self.vocab.get(x, self.vocab["__OOV__"]))
return series.to_numpy()

def encode_category(self, categories):
category_indices = [self.vocab.get(x, self.vocab["__OOV__"]) for x in categories]
return np.array(category_indices)
def encode_category(self, series):
series = series.apply(lambda x: self.vocab.get(x, self.vocab["__OOV__"]))
return series.to_numpy()

def encode_sequence(self, texts):
sequence_list = []
for text in texts:
if pd.isnull(text) or text == '':
sequence_list.append([])
else:
sequence_list.append([self.vocab.get(x, self.vocab["__OOV__"]) if x != self._na_value \
else self.vocab["__PAD__"] for x in text.split(self._splitter)])
sequence_list = pad_sequences(sequence_list, maxlen=self.max_len, value=self.vocab["__PAD__"],
padding=self.padding, truncating=self.padding)
return np.array(sequence_list)
def encode_sequence(self, series):
series = series.apply(
lambda text: [self.vocab.get(x, self.vocab["__OOV__"]) if x != self._na_value \
else self.vocab["__PAD__"] for x in text.split(self._splitter)]
).apply(
lambda seq: pad_sequences([seq], maxlen=self.max_len,
value=self.vocab["__PAD__"],
padding=self.padding, truncating=self.padding)[0]
)
return series.to_numpy()

def load_pretrained_vocab(self, feature_dtype, pretrain_path, expand_vocab=True):
if pretrain_path.endswith(".h5"):
Expand Down
2 changes: 1 addition & 1 deletion fuxictr/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__="2.2.1"
__version__="2.2.2"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="fuxictr",
version="2.2.1",
version="2.2.2",
author="RECZOO",
author_email="[email protected]",
description="A configurable, tunable, and reproducible library for CTR prediction",
Expand Down

0 comments on commit 12c49f4

Please sign in to comment.