Skip to content

Commit

Permalink
Update customized preprocess with polars
Browse files Browse the repository at this point in the history
  • Loading branch information
xpai committed Jun 9, 2024
1 parent b0bdfa1 commit 8c65924
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 198 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
[Doing] Add support for saving pb file, exporting embeddings
[Doing] Add support of multi-gpu training

**FuxiCTR v2.3.1, 2024-06-09**
+ [Fix] Fix customized preprossors based on polars and update demos
+ [Doc] Add copyrights
+ [Feature] Add `embedding_dim` setting to numeric features

**FuxiCTR v2.3.0, 2024-04-20**
+ [Refactor] Support reading CSV and Parquet files as inputs
+ [Feature] Add dataloader for parquet
Expand Down
202 changes: 101 additions & 101 deletions data/tiny_csv/test_sample.csv

Large diffs are not rendered by default.

26 changes: 10 additions & 16 deletions demo/config/example7_config/dataset_config.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
### Tiny data for demo only
tiny_example7:
data_root: ../data/
data_format: csv
train_data: ../data/tiny_csv/custom_preprocess_train_sample.csv
valid_data: ../data/tiny_csv/custom_preprocess_valid_sample.csv
test_data: ../data/tiny_csv/custom_preprocess_test_sample.csv
data_format: csv
train_data: ../data/tiny_csv/train_sample.csv
valid_data: ../data/tiny_csv/valid_sample.csv
test_data: ../data/tiny_csv/test_sample.csv
min_categr_count: 1
feature_cols:
- active: true
dtype: str
name: [msno, song_id, source_system_tab, source_screen_name, source_type,
city, gender, registered_via, language]
type: categorical
- {active: true, dtype: str, encoder: MaskedSumPooling, max_len: 3, name: genre_ids,
type: sequence}
- {active: true, dtype: str, encoder: MaskedSumPooling, max_len: 3, name: artist_name,
type: sequence}
- {active: true, dtype: str, name: isrc, preprocess: extract_country_code, type: categorical}
- {active: true, dtype: str, name: bd, preprocess: bucketize_age, type: categorical}
label_col: {dtype: float, name: label}
[{name: ["userid","adgroup_id","pid","cate_id","campaign_id","customer","brand","cms_segid",
"cms_group_id","final_gender_code","age_level","pvalue_level","shopping_level","occupation"],
active: True, dtype: str, type: categorical},
{name: "weekday", active: True, dtype: str, type: categorical, preprocess: convert_weekday},
{name: "hour", active: True, dtype: str, type: categorical, preprocess: convert_hour}]
label_col: {name: clk, dtype: float}
16 changes: 16 additions & 0 deletions demo/example1_build_dataset_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
16 changes: 16 additions & 0 deletions demo/example2_DeepFM_with_parquet_input.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
16 changes: 16 additions & 0 deletions demo/example3_DeepFM_with_npz_input.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
16 changes: 16 additions & 0 deletions demo/example4_DeepFM_with_csv_input.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
16 changes: 16 additions & 0 deletions demo/example5_DeepFM_with_pretrained_emb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
16 changes: 16 additions & 0 deletions demo/example6_DIN_with_sequence_feature.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
Expand Down
Original file line number Diff line number Diff line change
@@ -1,57 +1,62 @@
# =========================================================================
# Copyright (C) 2024. The FuxiCTR Library. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

import sys
sys.path.append('../')
import os
import logging
from datetime import datetime
from fuxictr import datasets
from fuxictr.utils import load_config, set_logger, print_to_json
from fuxictr.features import FeatureMap
from fuxictr.pytorch.torch_utils import seed_everything
from fuxictr.pytorch.dataloaders import RankDataLoader
from fuxictr.preprocess import FeatureProcessor, build_dataset
from model_zoo import DeepFM
import pandas as pd
import polars as pl
from datetime import datetime


class CustomFeatureProcessor(FeatureProcessor):
'''
In the config/dataset_config.yaml file, the 'extract_country_code' and 'bucketize_age' processors are utilized.
Therefore, it is necessary to create a new class that inherits from the 'FeatureProcessor' class and implement
the respective functions for these two processors.
Each Process Function should accept two arguments: df and col_name.
'''
def extract_country_code(self, df, col_name):
return df[col_name].apply(lambda isrc: isrc[0:2] if not pd.isnull(isrc) else "")
class CustomizedFeatureProcessor(FeatureProcessor):
"""
This is a demo for implementing customized feature processing functions.
def bucketize_age(self, df, col_name):
def _bucketize(age):
if pd.isnull(age):
return ""
else:
age = float(age)
if age < 1 or age > 95:
return ""
elif age <= 10:
return "1"
elif age <=20:
return "2"
elif age <=30:
return "3"
elif age <=40:
return "4"
elif age <=50:
return "5"
elif age <=60:
return "6"
else:
return "7"
return df[col_name].apply(_bucketize)

In the config/example7_config/dataset_config.yaml file, the 'convert_weekday' and 'convert_hour'
processors are called. Hence, it is necessary to implement the two functions by inheriting from
'fuxictr.preprocess.FeatureProcessor'. Some concrete examples can be found in 'fuxictr.datasets'.
Each processor function ONLY accepts one argument: col_name, and returns an expression based on
polars. We use polars instead of pandas for speedup.
"""

def convert_weekday(self, col_name=None):
def _convert_weekday(timestamp):
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
return int(dt.strftime('%w'))
return pl.col("time_stamp").apply(_convert_weekday)

def convert_hour(self, col_name=None):
def _convert_hour(timestamp):
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
return int(dt.hour)
return pl.col("time_stamp").apply(_convert_hour)

if __name__ == '__main__':
# Load params from config files
config_dir = './config/example7_config'
experiment_id = 'DeepFM_test_csv' # corresponds to input `data/tiny_npz`
experiment_id = 'DeepFM_test_csv'
params = load_config(config_dir, experiment_id)

# set up logger and random seed
Expand All @@ -60,19 +65,12 @@ def _bucketize(age):
seed_everything(seed=params['seed'])

# Set feature_encoder that defines how to preprocess data
use_custom_processor = True
if use_custom_processor:
feature_encoder = CustomFeatureProcessor(feature_cols=params["feature_cols"],
feature_encoder = CustomizedFeatureProcessor(feature_cols=params["feature_cols"],
label_col=params["label_col"],
dataset_id=params["dataset_id"],
data_root=params["data_root"])
else:
feature_encoder = FeatureProcessor(feature_cols=params["feature_cols"],
label_col=params["label_col"],
dataset_id=params["dataset_id"],
data_root=params["data_root"])

# Build dataset from csv to npz, and remap data paths to npz files
# Build dataset
params["train_data"], params["valid_data"], params["test_data"] = \
build_dataset(feature_encoder,
train_data=params["train_data"],
Expand Down
17 changes: 9 additions & 8 deletions fuxictr/datasets/avazu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@
# =========================================================================


from fuxictr.preprocess import FeatureProcessor as BaseFeatureProcessor
from fuxictr.preprocess import FeatureProcessor
from datetime import datetime, date
import polars as pl


class FeatureProcessor(BaseFeatureProcessor):
def convert_weekday(self, df, col_name):
class CustomizedFeatureProcessor(FeatureProcessor):
def convert_weekday(self, col_name=None):
def _convert_weekday(timestamp):
dt = date(int('20' + timestamp[0:2]), int(timestamp[2:4]), int(timestamp[4:6]))
return int(dt.strftime('%w'))
return df['hour'].apply(_convert_weekday)
return pl.col("hour").apply(_convert_weekday)

def convert_weekend(self, df, col_name):
def convert_weekend(self, col_name=None):
def _convert_weekend(timestamp):
dt = date(int('20' + timestamp[0:2]), int(timestamp[2:4]), int(timestamp[4:6]))
return 1 if dt.strftime('%w') in ['6', '0'] else 0
return df['hour'].apply(_convert_weekend)
return pl.col("hour").apply(_convert_weekend)

def convert_hour(self, df, col_name):
return df['hour'].apply(lambda x: int(x[6:8]))
def convert_hour(self, col_name=None):
return pl.col("hour").apply(lambda x: int(x[6:8]))
9 changes: 5 additions & 4 deletions fuxictr/datasets/criteo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@


import numpy as np
from fuxictr.preprocess import FeatureProcessor as BaseFeatureProcessor
from fuxictr.preprocess import FeatureProcessor
import polars as pl


class FeatureProcessor(BaseFeatureProcessor):
def convert_to_bucket(self, df, col_name):
class CustomizedFeatureProcessor(FeatureProcessor):
def convert_to_bucket(self, col_name=None):
def _convert_to_bucket(value):
if value > 2:
value = int(np.floor(np.log(value) ** 2))
else:
value = int(value)
return value
return df[col_name].map(_convert_to_bucket).astype(int)
return pl.col(col_name).apply(_convert_to_bucket).cast(pl.Int32)
15 changes: 8 additions & 7 deletions fuxictr/datasets/kkbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
# =========================================================================


import pandas as pd
from fuxictr.preprocess import FeatureProcessor as BaseFeatureProcessor
from fuxictr.preprocess import FeatureProcessor
import polars as pl

class FeatureProcessor(BaseFeatureProcessor):
def extract_country_code(self, df, col_name):
return df[col_name].apply(lambda isrc: isrc[0:2] if not pd.isnull(isrc) else "")

def bucketize_age(self, df, col_name):
class CustomizedFeatureProcessor(FeatureProcessor):
def extract_country_code(self, col_name=None):
return pl.col(col_name).apply(lambda isrc: isrc[0:2] if not pl.is_null(isrc) else "")

def bucketize_age(self, col_name=None):
def _bucketize(age):
if pd.isnull(age):
return ""
Expand All @@ -45,4 +46,4 @@ def _bucketize(age):
return "6"
else:
return "7"
return df[col_name].apply(_bucketize)
return pl.col(col_name).apply(_bucketize)
Loading

0 comments on commit 8c65924

Please sign in to comment.