diff --git a/lib/cpp/preprocessing/longitudinal_features_lagger.cpp b/lib/cpp/preprocessing/longitudinal_features_lagger.cpp index f6e7f352f..5c3e61813 100644 --- a/lib/cpp/preprocessing/longitudinal_features_lagger.cpp +++ b/lib/cpp/preprocessing/longitudinal_features_lagger.cpp @@ -6,34 +6,38 @@ #include "tick/preprocessing/longitudinal_features_lagger.h" + LongitudinalFeaturesLagger::LongitudinalFeaturesLagger( - const SBaseArrayDouble2dPtrList1D &features, - const SArrayULongPtr n_lags) - : n_intervals(features[0]->n_rows()), - n_lags(n_lags), - n_samples(features.size()), - n_observations(n_samples * n_intervals), - n_features(features[0]->n_cols()), - n_lagged_features(n_lags->sum() + n_lags->size()) { - col_offset = ArrayULong(n_lags->size()); - col_offset.init_to_zero(); - if (n_features != n_lags->size()) { - TICK_ERROR("Features matrix column number should match n_lags length."); - } - if ((*n_lags)[0] >= n_intervals) { - TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1)."); - } + ulong n_intervals, + SArrayULongPtr _n_lags) + : n_intervals(n_intervals), + n_lags(_n_lags), + n_features(_n_lags->size()), + n_lagged_features(_n_lags->size() + _n_lags->sum()) { + if (n_lags != nullptr) compute_col_offset(n_lags); +} + +void LongitudinalFeaturesLagger::compute_col_offset(const SArrayULongPtr n_lags) { + ArrayULong col_offset_temp = ArrayULong(n_lags->size()); + col_offset_temp.init_to_zero(); for (ulong i(1); i < n_lags->size(); i++) { if ((*n_lags)[i] >= n_intervals) { TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1)."); } - col_offset[i] = col_offset[i - 1] + (*n_lags)[i-1] + 1; + col_offset_temp[i] = col_offset_temp[i - 1] + (*n_lags)[i-1] + 1; } + col_offset = col_offset_temp.as_sarray_ptr(); } void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features, ArrayDouble2d &out, ulong censoring) const { + if (n_intervals != features.n_rows()) { + TICK_ERROR("Features matrix rows count should match n_intervals."); + } + if (n_features != features.n_cols()) { + TICK_ERROR("Features matrix column count should match n_lags length."); + } if (out.n_cols() != n_lagged_features) { TICK_ERROR( "n_columns of &out should be equal to n_features + sum(n_lags)."); @@ -47,8 +51,9 @@ void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features, n_cols_feature = (*n_lags)[feature] + 1; for (ulong j = 0; j < n_intervals; j++) { row = j; - col = col_offset[feature]; - value = features(row, feature); + col = (*col_offset)[feature]; + // use view_row instead of (row, feature) to be const + value = view_row(features, row)[feature]; max_col = col + n_cols_feature; if (value != 0) { while (row < censoring && col < max_col) { @@ -68,6 +73,7 @@ void LongitudinalFeaturesLagger::sparse_lag_preprocessor(ArrayULong &row, ArrayULong &out_col, ArrayDouble &out_data, ulong censoring) const { + // TODO: add checks here ? Or do them in Python ? ulong j(0), r, c, offset, new_col, max_col; double value; @@ -75,7 +81,7 @@ void LongitudinalFeaturesLagger::sparse_lag_preprocessor(ArrayULong &row, value = data[i]; r = row[i]; c = col[i]; - offset = col_offset[c]; + offset = (*col_offset)[c]; max_col = offset + (*n_lags)[c] + 1; new_col = offset; diff --git a/lib/cpp/preprocessing/sparse_longitudinal_features_product.cpp b/lib/cpp/preprocessing/sparse_longitudinal_features_product.cpp index 9f935634d..a522a7594 100644 --- a/lib/cpp/preprocessing/sparse_longitudinal_features_product.cpp +++ b/lib/cpp/preprocessing/sparse_longitudinal_features_product.cpp @@ -7,10 +7,6 @@ #include "tick/preprocessing/sparse_longitudinal_features_product.h" #include -SparseLongitudinalFeaturesProduct::SparseLongitudinalFeaturesProduct( - const SBaseArrayDouble2dPtrList1D &features) - : n_features(features[0]->n_cols()) {} - ulong SparseLongitudinalFeaturesProduct::get_feature_product_col( ulong col1, ulong col2, ulong n_cols) const { if (col1 > col2) { // ensure we have the right order as the following formula diff --git a/lib/include/tick/preprocessing/longitudinal_features_lagger.h b/lib/include/tick/preprocessing/longitudinal_features_lagger.h index 8856ec8d4..b9b24cb54 100644 --- a/lib/include/tick/preprocessing/longitudinal_features_lagger.h +++ b/lib/include/tick/preprocessing/longitudinal_features_lagger.h @@ -15,17 +15,21 @@ class LongitudinalFeaturesLagger { protected: ulong n_intervals; SArrayULongPtr n_lags; - ArrayULong col_offset; - ulong n_samples; - ulong n_observations; ulong n_features; ulong n_lagged_features; + SArrayULongPtr col_offset; public: - LongitudinalFeaturesLagger(const SBaseArrayDouble2dPtrList1D &features, - const SArrayULongPtr n_lags); + // This exists solely for cereal/swig + LongitudinalFeaturesLagger() = default; - void dense_lag_preprocessor(ArrayDouble2d &features, ArrayDouble2d &out, + LongitudinalFeaturesLagger(ulong n_intervals, + SArrayULongPtr n_lags); + + void compute_col_offset(SArrayULongPtr n_lags); + + void dense_lag_preprocessor(ArrayDouble2d &features, + ArrayDouble2d &out, ulong censoring) const; void sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col, @@ -34,14 +38,26 @@ class LongitudinalFeaturesLagger { ulong censoring) const; template - void serialize(Archive &ar) { + void load(Archive &ar) { + ar(CEREAL_NVP(n_intervals)); + ar(CEREAL_NVP(n_features)); + ar(CEREAL_NVP(n_lagged_features)); + + Array temp_n_lags, temp_col_offset; + ar(cereal::make_nvp("n_lags", temp_n_lags)); + + n_lags = temp_n_lags.as_sarray_ptr(); + col_offset = temp_col_offset.as_sarray_ptr(); + } + + + template + void save(Archive &ar) const { ar(CEREAL_NVP(n_intervals)); - ar(CEREAL_NVP(n_lags)); - ar(CEREAL_NVP(col_offset)); - ar(CEREAL_NVP(n_samples)); - ar(CEREAL_NVP(n_observations)); ar(CEREAL_NVP(n_features)); ar(CEREAL_NVP(n_lagged_features)); + ar(cereal::make_nvp("n_lags", *n_lags)); + ar(cereal::make_nvp("col_offset", *col_offset)); } }; diff --git a/lib/include/tick/preprocessing/sparse_longitudinal_features_product.h b/lib/include/tick/preprocessing/sparse_longitudinal_features_product.h index a8e7be058..4653aaafa 100644 --- a/lib/include/tick/preprocessing/sparse_longitudinal_features_product.h +++ b/lib/include/tick/preprocessing/sparse_longitudinal_features_product.h @@ -16,8 +16,11 @@ class SparseLongitudinalFeaturesProduct { ulong n_features; public: + // This exists soley for cereal/swig + SparseLongitudinalFeaturesProduct() = default; + explicit SparseLongitudinalFeaturesProduct( - const SBaseArrayDouble2dPtrList1D &features); + const ulong n_features): n_features(n_features) {} inline ulong get_feature_product_col(ulong col1, ulong col2, ulong n_cols) const; @@ -28,7 +31,12 @@ class SparseLongitudinalFeaturesProduct { ArrayDouble &out_data) const; template - void serialize(Archive &ar) { + void load(Archive &ar) { + ar(CEREAL_NVP(n_features)); + } + + template + void save(Archive &ar) const { ar(CEREAL_NVP(n_features)); } }; diff --git a/lib/swig/preprocessing/longitudinal_features_lagger.i b/lib/swig/preprocessing/longitudinal_features_lagger.i index 7cef9bf80..7f9f647c5 100644 --- a/lib/swig/preprocessing/longitudinal_features_lagger.i +++ b/lib/swig/preprocessing/longitudinal_features_lagger.i @@ -4,24 +4,25 @@ #include "tick/preprocessing/longitudinal_features_lagger.h" %} +%include serialization.i + class LongitudinalFeaturesLagger { public: - LongitudinalFeaturesLagger(const SBaseArrayDouble2dPtrList1D &features, - const SArrayULongPtr n_lags); + // This exists soley for cereal/swig + LongitudinalFeaturesLagger(); + + LongitudinalFeaturesLagger(ulong n_intervals, + SArrayULongPtr n_lags); void dense_lag_preprocessor(ArrayDouble2d &features, ArrayDouble2d &out, ulong censoring) const; - void sparse_lag_preprocessor(ArrayULong &row, - ArrayULong &col, - ArrayDouble &data, - ArrayULong &out_row, - ArrayULong &out_col, - ArrayDouble &out_data, + void sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col, + ArrayDouble &data, ArrayULong &out_row, + ArrayULong &out_col, ArrayDouble &out_data, ulong censoring) const; - }; TICK_MAKE_PICKLABLE(LongitudinalFeaturesLagger); \ No newline at end of file diff --git a/lib/swig/preprocessing/sparse_longitudinal_features_product.i b/lib/swig/preprocessing/sparse_longitudinal_features_product.i index 4675c182f..a0a25c55d 100644 --- a/lib/swig/preprocessing/sparse_longitudinal_features_product.i +++ b/lib/swig/preprocessing/sparse_longitudinal_features_product.i @@ -4,10 +4,15 @@ #include "tick/preprocessing/sparse_longitudinal_features_product.h" %} +%include serialization.i + class SparseLongitudinalFeaturesProduct { public: - SparseLongitudinalFeaturesProduct(const SBaseArrayDouble2dPtrList1D &features); + // This exists soley for cereal/swig + SparseLongitudinalFeaturesProduct(); + + SparseLongitudinalFeaturesProduct(const ulong n_features); void sparse_features_product(ArrayULong &row, ArrayULong &col, diff --git a/tick/preprocessing/base/longitudinal_preprocessor.py b/tick/preprocessing/base/longitudinal_preprocessor.py index 2f3f39ee0..173b75f23 100644 --- a/tick/preprocessing/base/longitudinal_preprocessor.py +++ b/tick/preprocessing/base/longitudinal_preprocessor.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from tick.base import Base +from multiprocessing import cpu_count class LongitudinalPreprocessor(ABC, Base): @@ -14,9 +15,14 @@ class LongitudinalPreprocessor(ABC, Base): set to the number of cores. """ - def __init__(self, n_jobs=-1): + _attrinfos = {'n_jobs': {'writable': True}} + + def __init__(self, n_jobs=1): Base.__init__(self) - self.n_jobs = n_jobs + if n_jobs == -1: + self.n_jobs = cpu_count() + else: + self.n_jobs = n_jobs @abstractmethod def fit(self, features, labels, censoring) -> None: diff --git a/tick/preprocessing/longitudinal_features_lagger.py b/tick/preprocessing/longitudinal_features_lagger.py index 5451b9073..52b5b76ed 100644 --- a/tick/preprocessing/longitudinal_features_lagger.py +++ b/tick/preprocessing/longitudinal_features_lagger.py @@ -3,10 +3,13 @@ import numpy as np import scipy.sparse as sps from tick.preprocessing.base import LongitudinalPreprocessor -from .build.preprocessing import LongitudinalFeaturesLagger\ +from tick.preprocessing.build.preprocessing import LongitudinalFeaturesLagger\ as _LongitudinalFeaturesLagger -from .utils import check_longitudinal_features_consistency,\ +from tick.preprocessing.utils import check_longitudinal_features_consistency,\ check_censoring_consistency +from multiprocessing.pool import Pool +from copy import deepcopy +from functools import partial, partialmethod class LongitudinalFeaturesLagger(LongitudinalPreprocessor): @@ -75,9 +78,6 @@ class LongitudinalFeaturesLagger(LongitudinalPreprocessor): "_n_intervals": { "writable": False }, - "_cpp_preprocessor": { - "writable": False - }, "_fitted": { "writable": False } @@ -92,7 +92,6 @@ def __init__(self, n_lags, n_jobs=-1): self._n_init_features = None self._n_output_features = None self._n_intervals = None - self._cpp_preprocessor = None self._fitted = False def _reset(self): @@ -100,7 +99,6 @@ def _reset(self): self._set("_n_init_features", None) self._set("_n_output_features", None) self._set("_n_intervals", None) - self._set("_cpp_preprocessor", None) self._set("_fitted", False) def fit(self, features, labels=None, censoring=None): @@ -137,10 +135,7 @@ def fit(self, features, labels=None, censoring=None): self._set("_n_init_features", n_init_features) self._set("_n_intervals", n_intervals) self._set("_n_output_features", int((self.n_lags + 1).sum())) - self._set("_cpp_preprocessor", - _LongitudinalFeaturesLagger(features, self.n_lags)) self._set("_fitted", True) - return self def transform(self, features, labels=None, censoring=None): @@ -166,7 +161,6 @@ def transform(self, features, labels=None, censoring=None): output : `[numpy.ndarrays]` or `[csr_matrices]`, shape=(n_intervals, n_features) The list of features matrices with added lagged features. """ - n_samples = len(features) if censoring is None: censoring = np.full((n_samples,), self._n_intervals, @@ -175,36 +169,57 @@ def transform(self, features, labels=None, censoring=None): base_shape = (self._n_intervals, self._n_init_features) features = check_longitudinal_features_consistency( features, base_shape, "float64") - if sps.issparse(features[0]): - X_with_lags = [ - self._sparse_lagger(x, int(censoring[i])) - for i, x in enumerate(features) - ] - # TODO: Don't get why int() is required here as censoring_i is uint64 - else: - X_with_lags = [ - self._dense_lagger(x, int(censoring[i])) - for i, x in enumerate(features) - ] + + initializer = partial(self._inject_cpp_object, + n_intervals=self._n_intervals, + n_lags=self.n_lags) + callback = self._sparse_lagger if sps.issparse(features[0]) \ + else self._dense_lagger + callback = partial(callback, n_intervals=self._n_intervals, + n_output_features=self._n_output_features, + n_lags=self.n_lags) + + with Pool(self.n_jobs, initializer=initializer) as pool: + X_with_lags = pool.starmap(callback, zip(features, censoring)) return X_with_lags, labels, censoring - def _dense_lagger(self, feature_matrix, censoring_i): - output = np.zeros((self._n_intervals, self._n_output_features), - dtype="float64") - self._cpp_preprocessor.dense_lag_preprocessor(feature_matrix, output, - censoring_i) + @staticmethod + def _inject_cpp_object(n_intervals, n_lags): + """Creates a global instance of the CPP preprocessor object. + + WARNING: to be used only as a multiprocessing.Pool initializer. + In multiprocessing context, each process has its own namespace, so using + global is not as bad as it seems. Still, it requires to proceed with + caution. + """ + global _cpp_preprocessor + _cpp_preprocessor = _LongitudinalFeaturesLagger(n_intervals, n_lags) + + @staticmethod + def _dense_lagger(feature_matrix, censoring_i, n_intervals, + n_output_features, n_lags): + """Creates a lagged version of a dense matrixrepresenting longitudinal + features.""" + global _cpp_preprocessor + output = np.zeros((n_intervals, n_output_features), dtype="float64") + _cpp_preprocessor.dense_lag_preprocessor(feature_matrix, output, + int(censoring_i)) return output - def _sparse_lagger(self, feature_matrix, censoring_i): + @staticmethod + def _sparse_lagger(feature_matrix, censoring_i, n_intervals, + n_output_features, n_lags): + """Creates a lagged version of a sparse matrix representing longitudinal + features.""" + global _cpp_preprocessor coo = feature_matrix.tocoo() - estimated_nnz = coo.nnz * int((self.n_lags + 1).sum()) + estimated_nnz = coo.nnz * int((n_lags + 1).sum()) out_row = np.zeros((estimated_nnz,), dtype="uint64") out_col = np.zeros((estimated_nnz,), dtype="uint64") out_data = np.zeros((estimated_nnz,), dtype="float64") - self._cpp_preprocessor.sparse_lag_preprocessor( + _cpp_preprocessor.sparse_lag_preprocessor( coo.row.astype("uint64"), coo.col.astype("uint64"), coo.data, - out_row, out_col, out_data, censoring_i) + out_row, out_col, out_data, int(censoring_i)) return sps.csr_matrix((out_data, (out_row, out_col)), - shape=(self._n_intervals, - self._n_output_features)) + shape=(n_intervals, n_output_features)) diff --git a/tick/preprocessing/longitudinal_features_product.py b/tick/preprocessing/longitudinal_features_product.py index 5dcdbd175..1d46ed3b4 100644 --- a/tick/preprocessing/longitudinal_features_product.py +++ b/tick/preprocessing/longitudinal_features_product.py @@ -5,10 +5,12 @@ from itertools import combinations from copy import deepcopy from scipy.misc import comb -from sklearn.externals.joblib import Parallel, delayed from tick.preprocessing.base import LongitudinalPreprocessor -from .build.preprocessing import SparseLongitudinalFeaturesProduct -from .utils import check_longitudinal_features_consistency +from tick.preprocessing.build.preprocessing \ + import SparseLongitudinalFeaturesProduct +from tick.preprocessing.utils import check_longitudinal_features_consistency +from functools import partial +from multiprocessing import Pool class LongitudinalFeaturesProduct(LongitudinalPreprocessor): @@ -41,7 +43,7 @@ class LongitudinalFeaturesProduct(LongitudinalPreprocessor): Attributes ---------- - mapper : `dict` + mapping : `dict` Map product features to column indexes of the resulting matrices. Examples @@ -71,10 +73,10 @@ class LongitudinalFeaturesProduct(LongitudinalPreprocessor): """ _attrinfos = { - "exposure_type": { + "_exposure_type": { "writable": False }, - "_mapper": { + "_mapping": { "writable": False }, "_n_init_features": { @@ -86,9 +88,6 @@ class LongitudinalFeaturesProduct(LongitudinalPreprocessor): "_n_intervals": { "writable": False }, - "_preprocessor": { - "writable": False - }, "_fitted": { "writable": False } @@ -99,31 +98,13 @@ def __init__(self, exposure_type="infinite", n_jobs=-1): if exposure_type not in ["infinite", "finite"]: raise ValueError("exposure_type should be either 'infinite' or\ 'finite', not %s" % exposure_type) + self._exposure_type = None self.exposure_type = exposure_type - self._reset() - - def _reset(self): - """Resets the object its initial construction state.""" - self._set("_n_init_features", None) - self._set("_n_output_features", None) - self._set("_n_intervals", None) - self._set("_mapper", {}) - self._set("_preprocessor", None) - self._set("_fitted", False) - - @property - def mapper(self): - """Get the mapping between the feature products and column indexes. - - Returns - ------- - output : `dict` - The column index - feature mapping. - """ - if not self._fitted: - raise ValueError( - "cannot get mapper if object has not been fitted.") - return deepcopy(self._mapper) + self._mapping = None + self._n_init_features = None + self._n_output_features = None + self._n_intervals = None + self._fitted = False def fit(self, features, labels=None, censoring=None): """Fit the feature product using the features matrices list. @@ -151,17 +132,11 @@ def fit(self, features, labels=None, censoring=None): self._set("_n_init_features", n_init_features) self._set("_n_intervals", n_intervals) comb_it = combinations(range(n_init_features), 2) - mapper = {i + n_init_features: c for i, c in enumerate(comb_it)} - self._set("_mapper", mapper) + mapping = {i + n_init_features: c for i, c in enumerate(comb_it)} + self._set("_mapping", mapping) self._set("_n_output_features", int(n_init_features + comb(n_init_features, 2))) - - if sps.issparse(features[0]) and self.exposure_type == "infinite": - self._set("_preprocessor", - SparseLongitudinalFeaturesProduct(features)) - self._set("_fitted", True) - return self def transform(self, features, labels=None, censoring=None): @@ -182,76 +157,116 @@ def transform(self, features, labels=None, censoring=None): The list of features matrices with added product features. n_new_features = n_features + comb(n_features, 2) """ - base_shape = (self._n_intervals, self._n_init_features) features = check_longitudinal_features_consistency( features, base_shape, "float64") - if self.exposure_type == "finite": - X_with_products = self._finite_exposure_products(features) - elif self.exposure_type == "infinite": - X_with_products = self._infinite_exposure_products(features) - else: - raise ValueError("exposure_type should be either 'infinite' or\ - 'finite', not %s" % self.exposure_type) - return X_with_products, labels, censoring - - def _infinite_exposure_products(self, features): - """Add product features to features in the infinite exposure case.""" - if sps.issparse(features[0]): - X_with_products = [ - self._sparse_infinite_product(arr) for arr in features - ] - # TODO later: fix multiprocessing - # X_with_products = Parallel(n_jobs=self.n_jobs)( - # delayed(self._sparse_infinite_product)(arr) for arr in features) - # Should be done in C++ + is_sparse = sps.issparse(features[0]) + + callback = None + initializer = None + + if is_sparse and self.exposure_type == 'infinite': + initializer = partial(self._inject_cpp_object, + n_features=self._n_init_features) + if is_sparse: + if self.exposure_type == 'infinite': + callback = partial(self._sparse_infinite_product, + n_intervals=self._n_intervals, + n_output_features=self._n_output_features) + else: + callback = partial(self._sparse_finite_product, + mapping=self._mapping) else: - raise ValueError("Infinite exposures should be stored in \ - sparse matrices as this hypothesis induces sparsity in the \ - feature matrix.") - - return X_with_products + if self.exposure_type == 'infinite': + raise ValueError("Infinite exposures should be stored in \ + sparse matrices as this hypothesis induces \ + sparsity in the feature matrix.") + else: + callback = partial(self._dense_finite_product, + mapping=self._mapping) - def _finite_exposure_products(self, features): - """Add product features to features in the finite exposure case.""" - if sps.issparse(features[0]): - X_with_products = Parallel(n_jobs=self.n_jobs)( - delayed(self._sparse_finite_product)(arr) for arr in features) - else: - X_with_products = Parallel(n_jobs=self.n_jobs)( - delayed(self._dense_finite_product)(arr) for arr in features) + with Pool(self.n_jobs, initializer=initializer) as pool: + X_with_products = pool.map(callback, features) - return X_with_products + return X_with_products, labels, censoring - def _dense_finite_product(self, feat_mat): + @staticmethod + def _dense_finite_product(feat_mat, mapping): """Performs feature product on a numpy.ndarray containing finite exposures.""" feat = [feat_mat] feat.extend([(feat_mat[:, i] * feat_mat[:, j]).reshape((-1, 1)) - for i, j in self._mapper.values()]) + for i, j in mapping.values()]) return np.hstack(feat) - def _sparse_finite_product(self, feat_mat): + @staticmethod + def _sparse_finite_product(feat_mat, mapping): """Performs feature product on a scipy.sparse.csr_matrix containing finite exposures.""" feat = [feat_mat.tocsc()] feat.extend([(feat_mat[:, i].multiply(feat_mat[:, j])) - for i, j in self.mapper.values()]) + for i, j in mapping.values()]) return sps.hstack(feat).tocsr() - def _sparse_infinite_product(self, feat_mat): + @staticmethod + def _sparse_infinite_product(feat_mat, n_intervals, n_output_features): """Performs feature product on a scipy.sparse.csr_matrix containing infinite exposures.""" + global _cpp_preprocessor coo = feat_mat.tocoo() nnz = coo.nnz - new_nnz = self._n_output_features * nnz + new_nnz = n_output_features * nnz new_row = np.zeros((new_nnz,), dtype="uint64") new_col = np.zeros((new_nnz,), dtype="uint64") new_data = np.zeros((new_nnz,), dtype="float64") - self._preprocessor.sparse_features_product( + _cpp_preprocessor.sparse_features_product( coo.row.astype("uint64"), coo.col.astype("uint64"), coo.data, new_row, new_col, new_data) return sps.csr_matrix((new_data, (new_row, new_col)), - shape=(self._n_intervals, - self._n_output_features)) + shape=(n_intervals, n_output_features)) + + @staticmethod + def _inject_cpp_object(n_features): + """Creates a global instance of the CPP preprocessor object. + + WARNING: to be used only as a multiprocessing.Pool initializer. + In multiprocessing context, each process has its own namespace, so using + global is not as bad as it seems. Still, it requires to proceed with + caution. + """ + global _cpp_preprocessor + _cpp_preprocessor = SparseLongitudinalFeaturesProduct(n_features) + + def _reset(self): + """Resets the object its initial construction state.""" + self._set("_n_init_features", None) + self._set("_n_output_features", None) + self._set("_n_intervals", None) + self._set("_mapping", {}) + self._set("_fitted", False) + + @property + def mapping(self): + """Get the mapping between the feature products and column indexes. + + Returns + ------- + output : `dict` + The column index - feature mapping. + """ + if not self._fitted: + raise ValueError( + "cannot get mapping if object has not been fitted.") + return deepcopy(self._mapping) + + @property + def exposure_type(self): + return self._exposure_type + + @exposure_type.setter + def exposure_type(self, value): + if value not in ['infinite', 'finite']: + raise ValueError("exposure_type should be either 'finite' or " + "'infinite'") + self._set('_exposure_type', value) diff --git a/tick/preprocessing/tests/longitudinal_features_lagger_test.py b/tick/preprocessing/tests/longitudinal_features_lagger_test.py index 5aec583b7..215590dba 100644 --- a/tick/preprocessing/tests/longitudinal_features_lagger_test.py +++ b/tick/preprocessing/tests/longitudinal_features_lagger_test.py @@ -4,6 +4,9 @@ from scipy.sparse import csr_matrix import unittest from tick.preprocessing import LongitudinalFeaturesLagger +from tick.preprocessing.build.preprocessing import LongitudinalFeaturesLagger\ + as _LongitudinalFeaturesLagger +import pickle class Test(unittest.TestCase): @@ -13,6 +16,7 @@ def setUp(self): np.array([[1, 1, 1], [0, 0, 1], [1, 1, 0]], dtype="float64") ] self.sparse_features = [csr_matrix(f) for f in self.features] + self.n_intervals, self.n_features = self.features[0].shape self.censoring = np.array([2, 3], dtype="uint64") @@ -26,15 +30,45 @@ def setUp(self): self.n_lags = np.array([1, 2, 1], dtype="uint64") def test_dense_pre_convolution(self): - feat_prod, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags)\ + lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=1)\ .fit_transform(self.features, censoring=self.censoring) - np.testing.assert_equal(feat_prod, self.expected_output) + np.testing.assert_equal(lagged_feat, self.expected_output) def test_sparse_pre_convolution(self): - feat_prod, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags)\ + lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=1)\ .fit_transform(self.sparse_features, censoring=self.censoring) - feat_prod = [f.todense() for f in feat_prod] - np.testing.assert_equal(feat_prod, self.expected_output) + lagged_feat = [f.todense() for f in lagged_feat] + np.testing.assert_equal(lagged_feat, self.expected_output) + + def test_parallelization_sparse(self): + lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=1) \ + .fit_transform(self.sparse_features, censoring=self.censoring) + p_lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=-1)\ + .fit_transform(self.sparse_features, censoring=self.censoring) + lagged_feat = [f.toarray() for f in lagged_feat] + p_lagged_feat = [f.toarray() for f in p_lagged_feat] + np.testing.assert_equal(lagged_feat, p_lagged_feat) + + def test_parallelization_dense(self): + lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=1) \ + .fit_transform(self.features, censoring=self.censoring) + p_lagged_feat, _, _ = LongitudinalFeaturesLagger(n_lags=self.n_lags, + n_jobs=-1)\ + .fit_transform(self.features, censoring=self.censoring) + np.testing.assert_equal(lagged_feat, p_lagged_feat) + + def test_serialization(self): + python = LongitudinalFeaturesLagger(n_lags=self.n_lags, n_jobs=1) + cpp = _LongitudinalFeaturesLagger(self.n_intervals, self.n_lags) + pickle.loads(pickle.dumps(python)) + pickle.loads(pickle.dumps(cpp)) + # Cannot check equality as CPP underlying objects will not be created in + # the same memory slot if __name__ == "__main__": diff --git a/tick/preprocessing/tests/longitudinal_features_product_test.py b/tick/preprocessing/tests/longitudinal_features_product_test.py index 9cde68d88..216462e17 100644 --- a/tick/preprocessing/tests/longitudinal_features_product_test.py +++ b/tick/preprocessing/tests/longitudinal_features_product_test.py @@ -4,6 +4,9 @@ from scipy.sparse import csr_matrix import unittest from tick.preprocessing import LongitudinalFeaturesProduct +from tick.preprocessing.build.preprocessing \ + import SparseLongitudinalFeaturesProduct +import pickle class Test(unittest.TestCase): @@ -24,6 +27,8 @@ def setUp(self): csr_matrix(f) for f in self.infinite_exposures ] + self.n_intervals, self.n_features = self.finite_exposures[0].shape + def test_finite_features_product(self): expected_output = \ [np.array([[0, 1, 0, 0, 0, 0], @@ -35,7 +40,7 @@ def test_finite_features_product(self): [1, 1, 0, 1, 0, 0], ], dtype="float64") ] - pp = LongitudinalFeaturesProduct("finite") + pp = LongitudinalFeaturesProduct("finite", n_jobs=1) feat_prod, _, _ = pp.fit_transform(self.finite_exposures) np.testing.assert_equal(feat_prod, expected_output) @@ -55,12 +60,37 @@ def test_sparse_infinite_features_product(self): [0, 0, 0, 0, 0, 0], ], dtype="float64") ] - sparse_feat = [csr_matrix(f) for f in self.infinite_exposures] - feat_prod, _, _ = LongitudinalFeaturesProduct("infinite")\ - .fit_transform(sparse_feat) + feat_prod, _, _ = LongitudinalFeaturesProduct("infinite", n_jobs=1)\ + .fit_transform(self.sparse_infinite_exposures) feat_prod = [f.toarray() for f in feat_prod] np.testing.assert_equal(feat_prod, expected_output) + def test_parallelization(self): + for exp_type in ['finite', 'infinite']: + feat_prod, _, _ = LongitudinalFeaturesProduct(exp_type, n_jobs=1)\ + .fit_transform(self.sparse_infinite_exposures) + p_feat_prod, _, _ = LongitudinalFeaturesProduct(exp_type, + n_jobs=-1)\ + .fit_transform(self.sparse_infinite_exposures) + feat_prod = [f.toarray() for f in feat_prod] + p_feat_prod = [f.toarray() for f in p_feat_prod] + np.testing.assert_equal(feat_prod, p_feat_prod) + + exp_type = 'finite' + feat_prod, _, _ = LongitudinalFeaturesProduct(exp_type, n_jobs=1) \ + .fit_transform(self.finite_exposures) + p_feat_prod, _, _ = LongitudinalFeaturesProduct(exp_type, n_jobs=-1) \ + .fit_transform(self.finite_exposures) + np.testing.assert_equal(feat_prod, p_feat_prod) + + def test_serialization(self): + python = LongitudinalFeaturesProduct() + cpp = SparseLongitudinalFeaturesProduct(self.n_features) + pickle.loads(pickle.dumps(python)) + pickle.loads(pickle.dumps(cpp)) + # Cannot check equality as CPP underlying objects will not be created in + # the same memory slot + if __name__ == "__main__": unittest.main() diff --git a/tick/survival/convolutional_sccs.py b/tick/survival/convolutional_sccs.py index cf223a9cf..f680b658c 100644 --- a/tick/survival/convolutional_sccs.py +++ b/tick/survival/convolutional_sccs.py @@ -355,7 +355,8 @@ def fit_kfold_cv(self, features, labels, censoring, C_tv_range: tuple = (), features, labels, censoring) # split the data with stratified KFold kf = StratifiedKFold(n_folds, shuffle, self.random_state) - labels_interval = np.nonzero(p_labels)[1] + # labels_interval = np.nonzero(p_labels)[1] + labels_interval = [np.nonzero(arr)[0][0] for arr in p_labels] # Training loop model_global_parameters = {