From cee08eecb06095953c9b1df708a4ef273383a041 Mon Sep 17 00:00:00 2001 From: GangBean Date: Wed, 19 Jun 2024 03:01:16 +0000 Subject: [PATCH 1/4] feat: implements trainer, dataset and refactor model #20 --- configs/train_config.yaml | 4 +- data/datasets/ngcf_data_pipeline.py | 1 + data/datasets/ngcf_dataset.py | 6 + models/ngcf.py | 44 ++++--- train.py | 8 +- trainers/ngcf_trainer.py | 179 ++++++++++++++++++++++++++++ 6 files changed, 223 insertions(+), 19 deletions(-) create mode 100644 data/datasets/ngcf_dataset.py create mode 100644 trainers/ngcf_trainer.py diff --git a/configs/train_config.yaml b/configs/train_config.yaml index a32c8c2..b8dc10c 100644 --- a/configs/train_config.yaml +++ b/configs/train_config.yaml @@ -19,7 +19,7 @@ epochs: 100 batch_size: 32 lr: 0.0001 optimizer: adam # adamw -loss_name: bce # bpr # pointwise # bce +loss_name: bpr # bpr # pointwise # bce patience: 5 top_n: 10 weight_decay: 0 #1e-5 @@ -43,4 +43,4 @@ model: embed_size: 64 NGCF: embed_size: 64 - num_orders: 3 + num_orders: 1 diff --git a/data/datasets/ngcf_data_pipeline.py b/data/datasets/ngcf_data_pipeline.py index 622f1a7..a75f324 100644 --- a/data/datasets/ngcf_data_pipeline.py +++ b/data/datasets/ngcf_data_pipeline.py @@ -40,6 +40,7 @@ def _set_laplacian_matrix(self, df): self.laplacian_matrix = torch.matmul(diagonal_degree_matrix, adjacency_matrix) adjacency_matrix = adjacency_matrix.cpu().detach() self.laplacian_matrix = torch.matmul(self.laplacian_matrix, diagonal_degree_matrix) + self.laplacian_matrix = self.laplacian_matrix.to(self.cfg.device) logger.info('done...') def preprocess(self) -> pd.DataFrame: diff --git a/data/datasets/ngcf_dataset.py b/data/datasets/ngcf_dataset.py new file mode 100644 index 0000000..ae6c449 --- /dev/null +++ b/data/datasets/ngcf_dataset.py @@ -0,0 +1,6 @@ +from .mf_dataset import MFDataset + +class NGCFDataset: + pass + +NGCFDataset = MFDataset diff --git a/models/ngcf.py b/models/ngcf.py index 7e9a6f8..5860bc4 100644 --- a/models/ngcf.py +++ b/models/ngcf.py @@ -2,12 +2,15 @@ import torch.nn as nn from models.base_model import BaseModel +from loguru import logger class NGCF(BaseModel): def __init__(self, cfg, num_users, num_items, laplacian_matrix): super().__init__() + self.cfg = cfg self.num_users = num_users + self.num_items = num_items self.laplacian_matrix = laplacian_matrix self.embedding = nn.Embedding( num_users+num_items, cfg.embed_size, dtype=torch.float32) @@ -22,29 +25,42 @@ def __init__(self, cfg, num_users, num_items, laplacian_matrix): def _init_weights(self): for child in self.children(): if isinstance(child, nn.Embedding): - nn.init.xavier_uniform_(child.weight)k + nn.init.xavier_uniform_(child.weight) def forward(self, user_id, item_id): user_embed_list, item_embed_list = [self.embedding(user_id),], [self.embedding(self.num_users+item_id)] - last_embed = self.embedding + last_embed = self.embedding.weight for w1, w2 in zip(self.W1, self.W2): - last_embed = embedding_propagation(last_embed, w1, w2) - user_embed_list.append(last_embed(user_id)) - item_embed_list.append(last_embed(self.num_users+item_id)) + last_embed: torch.Tensor = self.embedding_propagation(last_embed, w1, w2) + user_embed_list.append(last_embed[user_id]) + item_embed_list.append(last_embed[self.num_users + item_id]) user_embed = torch.concat(user_embed_list, dim=1) item_embed = torch.concat(item_embed_list, dim=1) - return torch.sum(user_emb * item_emb, dim=1) + return torch.sum(user_embed * item_embed, dim=1) - def embedding_propagation(self, last_embed, w1, w2): - identity_matrix = torch.eye(*self.laplacian_matrix.size()) - term1 = torch.matmul(self.laplacian_matrix + identity_matrix, last_embed) - term1 = w1(term1) + def embedding_propagation(self, last_embed: torch.Tensor, w1, w2): + identity_matrix = torch.eye(*self.laplacian_matrix.size()).to(self.cfg.device) + matrix = self.laplacian_matrix + identity_matrix - neighbor_embeddings = torch.matmul(self.laplacian_matrix, last_embed) - term2 = torch.mul(neighbor_embeddings, last_embed) - term2 = w2(term2) + # split calcuclation GPU memory shortage + chunk_size = 32 + embed_list = [] + for chunk_idx in range(0, self.num_users + self.num_items, chunk_size): + matrix_concat = matrix[chunk_idx : (chunk_idx + chunk_size)] + term1 = torch.matmul(matrix_concat, last_embed) + term1 = w1(term1) - return nn.functional.leaky_relu(term1 + term2) + laplacian_concat = self.laplacian_matrix[chunk_idx : (chunk_idx + chunk_size)] + neighbor_embeddings = torch.matmul(laplacian_concat, last_embed) + + last_embed_concat = last_embed[chunk_idx : (chunk_idx + chunk_size)] + term2 = torch.mul(neighbor_embeddings, last_embed_concat) + term2 = w2(term2) + embed_list.append(term1 + term2) + + embed_list = torch.concat(embed_list, dim=0) + + return nn.functional.leaky_relu(embed_list) diff --git a/train.py b/train.py index 8e25862..04f2a81 100644 --- a/train.py +++ b/train.py @@ -17,9 +17,11 @@ from data.datasets.ngcf_data_pipeline import NGCFDataPipeline from data.datasets.cdae_dataset import CDAEDataset from data.datasets.mf_dataset import MFDataset +from data.datasets.ngcf_dataset import NGCFDataset from trainers.cdae_trainer import CDAETrainer from trainers.dcn_trainer import DCNTrainer from trainers.mf_trainer import MFTrainer +from trainers.ngcf_trainer import NGCFTrainer from utils import set_seed @@ -91,7 +93,7 @@ def train(cfg, args):#train_dataset, valid_dataset, test_dataset, model_info): trainer.load_best_model() trainer.evaluate(args.test_eval_data, 'test') elif cfg.model_name in ('NGCF', ): - trainer = MGCFTrainer(cfg, args.model_info['num_items'], args.model_info['num_users'], + trainer = NGCFTrainer(cfg, args.model_info['num_items'], args.model_info['num_users'], args.data_pipeline.laplacian_matrix) trainer.run(train_dataloader, valid_dataloader, args.valid_eval_data) trainer.load_best_model() @@ -152,8 +154,8 @@ def main(cfg: OmegaConf): model_info['num_items'], model_info['num_users'] = data_pipeline.num_items, data_pipeline.num_users elif cfg.model_name == 'NGCF': train_data, valid_data, valid_eval_data, test_eval_data = data_pipeline.split(df) - train_dataset = MFDataset(train_data, num_items=data_pipeline.num_items) - valid_dataset = MFDataset(valid_data, num_items=data_pipeline.num_items) + train_dataset = NGCFDataset(train_data, num_items=data_pipeline.num_items) + valid_dataset = NGCFDataset(valid_data, num_items=data_pipeline.num_items) args.update({'valid_eval_data': valid_eval_data, 'test_eval_data': test_eval_data}) model_info['num_items'], model_info['num_users'] = data_pipeline.num_items, data_pipeline.num_users else: diff --git a/trainers/ngcf_trainer.py b/trainers/ngcf_trainer.py new file mode 100644 index 0000000..e7c9fc5 --- /dev/null +++ b/trainers/ngcf_trainer.py @@ -0,0 +1,179 @@ +import wandb + +import numpy as np +import pandas as pd +from tqdm import tqdm + +import torch +import torch.nn as nn +from torch import Tensor +from torch.utils.data import DataLoader +from torch.optim import Optimizer + +from loguru import logger +from omegaconf.dictconfig import DictConfig +import wandb + +from models.ngcf import NGCF +from .base_trainer import BaseTrainer +from metric import * +from loss import BPRLoss + +class NGCFTrainer(BaseTrainer): + def __init__(self, cfg: DictConfig, num_items: int, num_users: int, laplacian_matrix: torch.Tensor) -> None: + super().__init__(cfg) + logger.info(f'[DEVICE] device = {self.device}') + self.num_items = num_items + self.num_users = num_users + self.model = NGCF(self.cfg, num_users, num_items, laplacian_matrix).to(self.device) + self.optimizer: Optimizer = self._optimizer(self.cfg.optimizer, self.model, self.cfg.lr, self.cfg.weight_decay) + self.loss = self._loss() + + def _loss(self): + return BPRLoss() + + def run(self, train_dataloader: DataLoader, valid_dataloader: DataLoader, valid_eval_data: pd.DataFrame): + logger.info(f"[Trainer] run...") + + best_valid_loss: float = 1e+6 + best_valid_precision_at_k: float = .0 + best_valid_recall_at_k: float = .0 + best_valid_map_at_k: float = .0 + best_valid_ndcg_at_k: float = .0 + best_epoch: int = 0 + endurance: int = 0 + + # train + for epoch in range(self.cfg.epochs): + train_loss: float = self.train(train_dataloader) + valid_loss: float = self.validate(valid_dataloader) + (valid_precision_at_k, + valid_recall_at_k, + valid_map_at_k, + valid_ndcg_at_k) = self.evaluate(valid_eval_data, 'valid') + logger.info(f'''\n[Trainer] epoch: {epoch} > train loss: {train_loss:.4f} / + valid loss: {valid_loss:.4f} / + precision@K : {valid_precision_at_k:.4f} / + Recall@K: {valid_recall_at_k:.4f} / + MAP@K: {valid_map_at_k:.4f} / + NDCG@K: {valid_ndcg_at_k:.4f}''') + + # wandb logging + if self.cfg.wandb: + wandb.log({ + 'train_loss': train_loss, + 'valid_loss': valid_loss, + 'valid_Precision@K': valid_precision_at_k, + 'valid_Recall@K': valid_recall_at_k, + 'valid_MAP@K': valid_map_at_k, + 'valid_NDCG@K': valid_ndcg_at_k, + }) + + # update model + if self._is_surpass_best_metric( + current=(valid_loss, + valid_precision_at_k, + valid_recall_at_k, + valid_map_at_k, + valid_ndcg_at_k), + best=(best_valid_loss, + best_valid_precision_at_k, + best_valid_recall_at_k, + best_valid_map_at_k, + best_valid_ndcg_at_k)): + logger.info(f"[Trainer] update best model...") + best_valid_loss = valid_loss + best_valid_precision_at_k = valid_precision_at_k + best_valid_recall_at_k = valid_recall_at_k + best_valid_ndcg_at_k = valid_ndcg_at_k + best_valid_map_at_k = valid_map_at_k + best_epoch = epoch + endurance = 0 + + torch.save(self.model.state_dict(), f'{self.cfg.model_dir}/best_model.pt') + else: + endurance += 1 + if endurance > self.cfg.patience: + logger.info(f"[Trainer] ealry stopping...") + break + + + def train(self, train_dataloader: DataLoader) -> float: + self.model.train() + train_loss = 0 + for data in tqdm(train_dataloader): + user_id, pos_item, neg_item = data['user_id'].to(self.device), data['pos_item'].to(self.device), \ + data['neg_item'].to(self.device) + pos_pred = self.model(user_id, pos_item) + neg_pred = self.model(user_id, neg_item) + + self.optimizer.zero_grad() + loss = self.loss(pos_pred, neg_pred) + loss.backward() + self.optimizer.step() + + train_loss += loss.item() + + return train_loss + + def validate(self, valid_dataloader: DataLoader) -> tuple[float]: + self.model.eval() + valid_loss = 0 + actual, predicted = [], [] + for data in tqdm(valid_dataloader): + user_id, pos_item, neg_item = data['user_id'].to(self.device), data['pos_item'].to(self.device), \ + data['neg_item'].to(self.device) + pos_pred = self.model(user_id, pos_item) + neg_pred = self.model(user_id, neg_item) + + loss = self.loss(pos_pred, neg_pred) + + valid_loss += loss.item() + + return valid_loss + + def evaluate(self, eval_data: pd.DataFrame, mode='valid') -> tuple: + + self.model.eval() + actual, predicted = [], [] + item_input = torch.tensor([item_id for item_id in range(self.num_items)]).to(self.device) + for user_id, row in tqdm(eval_data.iterrows(), total=eval_data.shape[0]): + pred = self.model(torch.tensor([user_id,]*self.num_items).to(self.device), item_input) + batch_predicted = \ + self._generate_top_k_recommendation(pred, row['mask_items']) + actual.append(row['pos_items']) + predicted.append(batch_predicted) + + test_precision_at_k = precision_at_k(actual, predicted, self.cfg.top_n) + test_recall_at_k = recall_at_k(actual, predicted, self.cfg.top_n) + test_map_at_k = map_at_k(actual, predicted, self.cfg.top_n) + test_ndcg_at_k = ndcg_at_k(actual, predicted, self.cfg.top_n) + + if mode == 'test': + logger.info(f'''\n[Trainer] Test > + precision@{self.cfg.top_n} : {test_precision_at_k:.4f} / + Recall@{self.cfg.top_n}: {test_recall_at_k:.4f} / + MAP@{self.cfg.top_n}: {test_map_at_k:.4f} / + NDCG@{self.cfg.top_n}: {test_ndcg_at_k:.4f}''') + + return (test_precision_at_k, + test_recall_at_k, + test_map_at_k, + test_ndcg_at_k) + + def _generate_top_k_recommendation(self, pred: Tensor, mask_items) -> tuple[list]: + + # mask to train items + pred = pred.cpu().detach().numpy() + pred[mask_items] = -3.40282e+38 # finfo(float32) + + # find the largest topK item indexes by user + topn_index = np.argpartition(pred, -self.cfg.top_n)[-self.cfg.top_n:] + # take probs from predictions using above indexes + topn_prob = np.take_along_axis(pred, topn_index, axis=0) + # sort topK probs and find their indexes + sorted_indices = np.argsort(-topn_prob) + # apply sorted indexes to item indexes to get sorted topK item indexes by user + topn_index_sorted = np.take_along_axis(topn_index, sorted_indices, axis=0) + + return topn_index_sorted From 19b92404e1394512712f7f0632fbbe98d7506285 Mon Sep 17 00:00:00 2001 From: GangBean Date: Thu, 20 Jun 2024 00:18:20 +0000 Subject: [PATCH 2/4] refactor: split gpu calculation into chunk size #20 --- models/ngcf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/ngcf.py b/models/ngcf.py index 5860bc4..e6b26e1 100644 --- a/models/ngcf.py +++ b/models/ngcf.py @@ -41,15 +41,15 @@ def forward(self, user_id, item_id): return torch.sum(user_embed * item_embed, dim=1) def embedding_propagation(self, last_embed: torch.Tensor, w1, w2): - identity_matrix = torch.eye(*self.laplacian_matrix.size()).to(self.cfg.device) - matrix = self.laplacian_matrix + identity_matrix + identity_matrix = torch.eye(*self.laplacian_matrix.size()) + matrix = self.laplacian_matrix.to('cpu') + identity_matrix # split calcuclation GPU memory shortage chunk_size = 32 embed_list = [] for chunk_idx in range(0, self.num_users + self.num_items, chunk_size): matrix_concat = matrix[chunk_idx : (chunk_idx + chunk_size)] - term1 = torch.matmul(matrix_concat, last_embed) + term1 = torch.matmul(matrix_concat.to(self.cfg.device), last_embed) term1 = w1(term1) laplacian_concat = self.laplacian_matrix[chunk_idx : (chunk_idx + chunk_size)] From 0bb5e484d859b41c6ce628e434be6da0e8d63f95 Mon Sep 17 00:00:00 2001 From: Judy Date: Sun, 23 Jun 2024 00:34:07 +0000 Subject: [PATCH 3/4] feat: split embedding propagation calculations across multiple GPUs #20 --- data/datasets/ngcf_data_pipeline.py | 6 ++- models/ngcf.py | 81 ++++++++++++++++++++++------- trainers/ngcf_trainer.py | 7 +-- 3 files changed, 69 insertions(+), 25 deletions(-) diff --git a/data/datasets/ngcf_data_pipeline.py b/data/datasets/ngcf_data_pipeline.py index a75f324..983b634 100644 --- a/data/datasets/ngcf_data_pipeline.py +++ b/data/datasets/ngcf_data_pipeline.py @@ -38,11 +38,13 @@ def _set_laplacian_matrix(self, df): diagonal_degree_matrix = torch.tensor(diagonal_degree_matrix).float().to('cuda') adjacency_matrix = torch.tensor(adjacency_matrix).float().to('cuda') self.laplacian_matrix = torch.matmul(diagonal_degree_matrix, adjacency_matrix) - adjacency_matrix = adjacency_matrix.cpu().detach() + del adjacency_matrix self.laplacian_matrix = torch.matmul(self.laplacian_matrix, diagonal_degree_matrix) - self.laplacian_matrix = self.laplacian_matrix.to(self.cfg.device) + self.laplacian_matrix = self.laplacian_matrix.to('cpu') logger.info('done...') + del diagonal_degree_matrix + def preprocess(self) -> pd.DataFrame: df = super().preprocess() self._set_laplacian_matrix(df) diff --git a/models/ngcf.py b/models/ngcf.py index e6b26e1..44b7901 100644 --- a/models/ngcf.py +++ b/models/ngcf.py @@ -6,12 +6,12 @@ class NGCF(BaseModel): - def __init__(self, cfg, num_users, num_items, laplacian_matrix): + def __init__(self, cfg, num_users, num_items): #, laplacian_matrix): super().__init__() self.cfg = cfg self.num_users = num_users self.num_items = num_items - self.laplacian_matrix = laplacian_matrix + # self.laplacian_matrix = laplacian_matrix self.embedding = nn.Embedding( num_users+num_items, cfg.embed_size, dtype=torch.float32) @@ -27,11 +27,11 @@ def _init_weights(self): if isinstance(child, nn.Embedding): nn.init.xavier_uniform_(child.weight) - def forward(self, user_id, item_id): + def forward(self, user_id, item_id, laplacian_matrix): user_embed_list, item_embed_list = [self.embedding(user_id),], [self.embedding(self.num_users+item_id)] last_embed = self.embedding.weight for w1, w2 in zip(self.W1, self.W2): - last_embed: torch.Tensor = self.embedding_propagation(last_embed, w1, w2) + last_embed: torch.Tensor = self.embedding_propagation(last_embed, w1, w2, laplacian_matrix) user_embed_list.append(last_embed[user_id]) item_embed_list.append(last_embed[self.num_users + item_id]) @@ -40,27 +40,68 @@ def forward(self, user_id, item_id): return torch.sum(user_embed * item_embed, dim=1) - def embedding_propagation(self, last_embed: torch.Tensor, w1, w2): + def _embedding_propagation(self, last_embed: torch.Tensor, w1, w2): identity_matrix = torch.eye(*self.laplacian_matrix.size()) - matrix = self.laplacian_matrix.to('cpu') + identity_matrix + matrix = (self.laplacian_matrix.to('cpu') + identity_matrix).to(self.cfg.device) # split calcuclation GPU memory shortage - chunk_size = 32 - embed_list = [] - for chunk_idx in range(0, self.num_users + self.num_items, chunk_size): - matrix_concat = matrix[chunk_idx : (chunk_idx + chunk_size)] - term1 = torch.matmul(matrix_concat.to(self.cfg.device), last_embed) - term1 = w1(term1) +# chunk_size = 32 +# embed_list = [] +# for chunk_idx in range(0, self.num_users + self.num_items, chunk_size): +# matrix_concat = matrix[chunk_idx : (chunk_idx + chunk_size)] +# term1 = torch.matmul(matrix_concat.to(self.cfg.device), last_embed) +# term1 = w1(term1) +# +# laplacian_concat = self.laplacian_matrix[chunk_idx : (chunk_idx + chunk_size)] +# neighbor_embeddings = torch.matmul(laplacian_concat, last_embed) +# +# last_embed_concat = last_embed[chunk_idx : (chunk_idx + chunk_size)] +# term2 = torch.mul(neighbor_embeddings, last_embed_concat) +# term2 = w2(term2) +# embed_list.append(term1 + term2) +# +# embed_list = torch.concat(embed_list, dim=0) - laplacian_concat = self.laplacian_matrix[chunk_idx : (chunk_idx + chunk_size)] - neighbor_embeddings = torch.matmul(laplacian_concat, last_embed) + term1 = torch.matmul(matrix, last_embed) + term1 = w1(term1) - last_embed_concat = last_embed[chunk_idx : (chunk_idx + chunk_size)] - term2 = torch.mul(neighbor_embeddings, last_embed_concat) - term2 = w2(term2) - embed_list.append(term1 + term2) + neighbor_embeddings = torch.matmul(self.laplacian_matrix, last_embed) - embed_list = torch.concat(embed_list, dim=0) + term2 = torch.mul(neighbor_embeddings, last_embed) + term2 = w2(term2) - return nn.functional.leaky_relu(embed_list) + # return nn.functional.leaky_relu(embed_list) + return nn.functional.leaky_relu(term1 + term2) + def embedding_propagation(self, last_embed: torch.Tensor, w1, w2, laplacian_matrix): + device0 = torch.device('cuda:0') + device1 = torch.device('cuda:1') + + # Split last_embed into two parts for each GPU + mid = last_embed.size(0) // 2 + + # Prepare identity matrix and laplacian matrix on each GPU + identity_matrix = torch.eye(last_embed.size(0)) + matrix = laplacian_matrix + identity_matrix + + # Compute term1 on GPU0 + term1_part0 = torch.matmul(matrix.to(device0), last_embed.to(device0)) + term1_part0 = w1(term1_part0) + + # Compute term2 on GPU1 + w2 = w2.to(device1) + neighbor_embeddings = torch.matmul(laplacian_matrix.to(device1), last_embed.to(device1)) + term2_part1 = torch.mul(neighbor_embeddings, last_embed.to(device1)) + term2_part1 = w2(term2_part1) + + # Transfer term2_part1 to GPU0 + term2_part1 = term2_part1.to(device0) + + # Combine term1 and term2 on GPU0 + combined_result = term1_part0 + term2_part1 + #combined_result = term1_part0 + + # Apply activation function + result = nn.functional.leaky_relu(combined_result) + + return result # Return result to device0 diff --git a/trainers/ngcf_trainer.py b/trainers/ngcf_trainer.py index e7c9fc5..cc27c59 100644 --- a/trainers/ngcf_trainer.py +++ b/trainers/ngcf_trainer.py @@ -25,9 +25,10 @@ def __init__(self, cfg: DictConfig, num_items: int, num_users: int, laplacian_ma logger.info(f'[DEVICE] device = {self.device}') self.num_items = num_items self.num_users = num_users - self.model = NGCF(self.cfg, num_users, num_items, laplacian_matrix).to(self.device) + self.model = NGCF(self.cfg, num_users, num_items).to(self.device) self.optimizer: Optimizer = self._optimizer(self.cfg.optimizer, self.model, self.cfg.lr, self.cfg.weight_decay) self.loss = self._loss() + self.laplacian_matrix = laplacian_matrix def _loss(self): return BPRLoss() @@ -104,8 +105,8 @@ def train(self, train_dataloader: DataLoader) -> float: for data in tqdm(train_dataloader): user_id, pos_item, neg_item = data['user_id'].to(self.device), data['pos_item'].to(self.device), \ data['neg_item'].to(self.device) - pos_pred = self.model(user_id, pos_item) - neg_pred = self.model(user_id, neg_item) + pos_pred = self.model(user_id, pos_item, self.laplacian_matrix) + neg_pred = self.model(user_id, neg_item, self.laplacian_matrix) self.optimizer.zero_grad() loss = self.loss(pos_pred, neg_pred) From e026903635fa66f3c72e69e5fb17389a0358340e Mon Sep 17 00:00:00 2001 From: Judy Date: Tue, 25 Jun 2024 02:21:52 +0000 Subject: [PATCH 4/4] refactor: replace dense calculation into sparse calculation #20 --- data/datasets/ngcf_data_pipeline.py | 18 +++---- models/ngcf.py | 83 +++++++++-------------------- trainers/ngcf_trainer.py | 14 ++--- 3 files changed, 40 insertions(+), 75 deletions(-) diff --git a/data/datasets/ngcf_data_pipeline.py b/data/datasets/ngcf_data_pipeline.py index 983b634..25e6fec 100644 --- a/data/datasets/ngcf_data_pipeline.py +++ b/data/datasets/ngcf_data_pipeline.py @@ -21,30 +21,28 @@ def _set_laplacian_matrix(self, df): # transform df to user-item interaction (R) logger.info('transform df to user-item interaction') user_item_interaction = df.pivot_table(index='user_id', columns=['business_id'], values=['rating']) - user_item_interaction = user_item_interaction.droplevel(0, 1) + user_item_interaction = user_item_interaction.droplevel(0, 1).fillna(0) # adjacency matrix logger.info('create adjacency matrix') - adjacency_matrix = np.zeros((self.num_items+self.num_users, self.num_items+self.num_users)) + adjacency_matrix = np.zeros((self.num_items+self.num_users, self.num_items+self.num_users), dtype=np.float32) adjacency_matrix[:self.num_users,self.num_users:] = user_item_interaction adjacency_matrix[self.num_users:,:self.num_users] = user_item_interaction.T # diagonal degree matrix (n+m) x (m+n) logger.info('create diagonal degree matrix') - diagonal_degree_matrix = np.diag(1/np.sqrt(adjacency_matrix.sum(axis=0))) + diagonal_degree_matrix = np.diag(1/np.sqrt(adjacency_matrix.sum(axis=0))).astype(np.float32) # set laplacian matrix logger.info('set laplacian matrix') - diagonal_degree_matrix = torch.tensor(diagonal_degree_matrix).float().to('cuda') - adjacency_matrix = torch.tensor(adjacency_matrix).float().to('cuda') - self.laplacian_matrix = torch.matmul(diagonal_degree_matrix, adjacency_matrix) + diagonal_degree_matrix = torch.from_numpy(diagonal_degree_matrix).to_sparse().to('cuda') + adjacency_matrix = torch.from_numpy(adjacency_matrix).to_sparse().to('cuda') + self.laplacian_matrix = torch.sparse.mm(diagonal_degree_matrix, adjacency_matrix) del adjacency_matrix - self.laplacian_matrix = torch.matmul(self.laplacian_matrix, diagonal_degree_matrix) - self.laplacian_matrix = self.laplacian_matrix.to('cpu') + self.laplacian_matrix = torch.sparse.mm(self.laplacian_matrix, diagonal_degree_matrix) + del diagonal_degree_matrix logger.info('done...') - del diagonal_degree_matrix - def preprocess(self) -> pd.DataFrame: df = super().preprocess() self._set_laplacian_matrix(df) diff --git a/models/ngcf.py b/models/ngcf.py index 44b7901..818de24 100644 --- a/models/ngcf.py +++ b/models/ngcf.py @@ -27,6 +27,23 @@ def _init_weights(self): if isinstance(child, nn.Embedding): nn.init.xavier_uniform_(child.weight) + def bpr_forward(self, user_id, pos_item_ids, neg_item_ids, laplacian_matrix): + user_embed_list, pos_item_embed_list, neg_item_embed_list = \ + [self.embedding(user_id),], [self.embedding(self.num_users+pos_item_ids)], [self.embedding(self.num_users+neg_item_ids)] + last_embed = self.embedding.weight + + for w1, w2 in zip(self.W1, self.W2): + last_embed: torch.Tensor = self.embedding_propagation(last_embed, w1, w2, laplacian_matrix) + user_embed_list.append(last_embed[user_id]) + pos_item_embed_list.append(last_embed[self.num_users + pos_item_ids]) + neg_item_embed_list.append(last_embed[self.num_users + neg_item_ids]) + + user_embed = torch.concat(user_embed_list, dim=1) + pos_item_embed = torch.concat(pos_item_embed_list, dim=1) + neg_item_embed = torch.concat(neg_item_embed_list, dim=1) + + return torch.sum(user_embed * pos_item_embed, dim=1), torch.sum(user_embed * neg_item_embed, dim=1) + def forward(self, user_id, item_id, laplacian_matrix): user_embed_list, item_embed_list = [self.embedding(user_id),], [self.embedding(self.num_users+item_id)] last_embed = self.embedding.weight @@ -40,68 +57,16 @@ def forward(self, user_id, item_id, laplacian_matrix): return torch.sum(user_embed * item_embed, dim=1) - def _embedding_propagation(self, last_embed: torch.Tensor, w1, w2): - identity_matrix = torch.eye(*self.laplacian_matrix.size()) - matrix = (self.laplacian_matrix.to('cpu') + identity_matrix).to(self.cfg.device) - - # split calcuclation GPU memory shortage -# chunk_size = 32 -# embed_list = [] -# for chunk_idx in range(0, self.num_users + self.num_items, chunk_size): -# matrix_concat = matrix[chunk_idx : (chunk_idx + chunk_size)] -# term1 = torch.matmul(matrix_concat.to(self.cfg.device), last_embed) -# term1 = w1(term1) -# -# laplacian_concat = self.laplacian_matrix[chunk_idx : (chunk_idx + chunk_size)] -# neighbor_embeddings = torch.matmul(laplacian_concat, last_embed) -# -# last_embed_concat = last_embed[chunk_idx : (chunk_idx + chunk_size)] -# term2 = torch.mul(neighbor_embeddings, last_embed_concat) -# term2 = w2(term2) -# embed_list.append(term1 + term2) -# -# embed_list = torch.concat(embed_list, dim=0) - - term1 = torch.matmul(matrix, last_embed) - term1 = w1(term1) - - neighbor_embeddings = torch.matmul(self.laplacian_matrix, last_embed) - - term2 = torch.mul(neighbor_embeddings, last_embed) - term2 = w2(term2) - - # return nn.functional.leaky_relu(embed_list) - return nn.functional.leaky_relu(term1 + term2) - def embedding_propagation(self, last_embed: torch.Tensor, w1, w2, laplacian_matrix): - device0 = torch.device('cuda:0') - device1 = torch.device('cuda:1') - - # Split last_embed into two parts for each GPU - mid = last_embed.size(0) // 2 - - # Prepare identity matrix and laplacian matrix on each GPU - identity_matrix = torch.eye(last_embed.size(0)) + identity_matrix = torch.eye(*laplacian_matrix.size(), dtype=torch.float32).to_sparse().to(self.cfg.device) matrix = laplacian_matrix + identity_matrix - # Compute term1 on GPU0 - term1_part0 = torch.matmul(matrix.to(device0), last_embed.to(device0)) - term1_part0 = w1(term1_part0) - - # Compute term2 on GPU1 - w2 = w2.to(device1) - neighbor_embeddings = torch.matmul(laplacian_matrix.to(device1), last_embed.to(device1)) - term2_part1 = torch.mul(neighbor_embeddings, last_embed.to(device1)) - term2_part1 = w2(term2_part1) - - # Transfer term2_part1 to GPU0 - term2_part1 = term2_part1.to(device0) + term1 = torch.sparse.mm(matrix, last_embed) + term1 = w1(term1) - # Combine term1 and term2 on GPU0 - combined_result = term1_part0 + term2_part1 - #combined_result = term1_part0 + neighbor_embeddings = torch.sparse.mm(laplacian_matrix, last_embed) - # Apply activation function - result = nn.functional.leaky_relu(combined_result) + term2 = torch.mul(last_embed, neighbor_embeddings) + term2 = w2(term2) - return result # Return result to device0 + return nn.functional.leaky_relu(term1 + term2) diff --git a/trainers/ngcf_trainer.py b/trainers/ngcf_trainer.py index cc27c59..ecaa690 100644 --- a/trainers/ngcf_trainer.py +++ b/trainers/ngcf_trainer.py @@ -105,8 +105,7 @@ def train(self, train_dataloader: DataLoader) -> float: for data in tqdm(train_dataloader): user_id, pos_item, neg_item = data['user_id'].to(self.device), data['pos_item'].to(self.device), \ data['neg_item'].to(self.device) - pos_pred = self.model(user_id, pos_item, self.laplacian_matrix) - neg_pred = self.model(user_id, neg_item, self.laplacian_matrix) + pos_pred,neg_pred = self.model.bpr_forward(user_id, pos_item, neg_item, self.laplacian_matrix) self.optimizer.zero_grad() loss = self.loss(pos_pred, neg_pred) @@ -124,8 +123,7 @@ def validate(self, valid_dataloader: DataLoader) -> tuple[float]: for data in tqdm(valid_dataloader): user_id, pos_item, neg_item = data['user_id'].to(self.device), data['pos_item'].to(self.device), \ data['neg_item'].to(self.device) - pos_pred = self.model(user_id, pos_item) - neg_pred = self.model(user_id, neg_item) + pos_pred,neg_pred = self.model.bpr_forward(user_id, pos_item, neg_item, self.laplacian_matrix) loss = self.loss(pos_pred, neg_pred) @@ -138,8 +136,12 @@ def evaluate(self, eval_data: pd.DataFrame, mode='valid') -> tuple: self.model.eval() actual, predicted = [], [] item_input = torch.tensor([item_id for item_id in range(self.num_items)]).to(self.device) - for user_id, row in tqdm(eval_data.iterrows(), total=eval_data.shape[0]): - pred = self.model(torch.tensor([user_id,]*self.num_items).to(self.device), item_input) + + for idx in tqdm(np.random.randint(eval_data.shape[0], size=100), total=100): + user_id = eval_data.iloc[[idx], :].index[0] + row = eval_data.iloc[idx, :] + + pred = self.model(torch.tensor([user_id,]*self.num_items).to(self.device), item_input, self.laplacian_matrix) batch_predicted = \ self._generate_top_k_recommendation(pred, row['mask_items']) actual.append(row['pos_items'])