Skip to content

Commit

Permalink
Parallelize longitudinal preprocessors
Browse files Browse the repository at this point in the history
  • Loading branch information
MaryanMorel committed May 29, 2018
1 parent 8b7046b commit 94e0ff2
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 179 deletions.
46 changes: 26 additions & 20 deletions lib/cpp/preprocessing/longitudinal_features_lagger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,38 @@

#include "tick/preprocessing/longitudinal_features_lagger.h"


LongitudinalFeaturesLagger::LongitudinalFeaturesLagger(
const SBaseArrayDouble2dPtrList1D &features,
const SArrayULongPtr n_lags)
: n_intervals(features[0]->n_rows()),
n_lags(n_lags),
n_samples(features.size()),
n_observations(n_samples * n_intervals),
n_features(features[0]->n_cols()),
n_lagged_features(n_lags->sum() + n_lags->size()) {
col_offset = ArrayULong(n_lags->size());
col_offset.init_to_zero();
if (n_features != n_lags->size()) {
TICK_ERROR("Features matrix column number should match n_lags length.");
}
if ((*n_lags)[0] >= n_intervals) {
TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1).");
}
ulong n_intervals,
SArrayULongPtr _n_lags)
: n_intervals(n_intervals),
n_lags(_n_lags),
n_features(_n_lags->size()),
n_lagged_features(_n_lags->size() + _n_lags->sum()) {
if (n_lags != nullptr) compute_col_offset(n_lags);
}

void LongitudinalFeaturesLagger::compute_col_offset(const SArrayULongPtr n_lags) {
ArrayULong col_offset_temp = ArrayULong(n_lags->size());
col_offset_temp.init_to_zero();
for (ulong i(1); i < n_lags->size(); i++) {
if ((*n_lags)[i] >= n_intervals) {
TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1).");
}
col_offset[i] = col_offset[i - 1] + (*n_lags)[i-1] + 1;
col_offset_temp[i] = col_offset_temp[i - 1] + (*n_lags)[i-1] + 1;
}
col_offset = col_offset_temp.as_sarray_ptr();
}

void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
ArrayDouble2d &out,
ulong censoring) const {
if (n_intervals != features.n_rows()) {
TICK_ERROR("Features matrix rows count should match n_intervals.");
}
if (n_features != features.n_cols()) {
TICK_ERROR("Features matrix column count should match n_lags length.");
}
if (out.n_cols() != n_lagged_features) {
TICK_ERROR(
"n_columns of &out should be equal to n_features + sum(n_lags).");
Expand All @@ -47,8 +51,9 @@ void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
n_cols_feature = (*n_lags)[feature] + 1;
for (ulong j = 0; j < n_intervals; j++) {
row = j;
col = col_offset[feature];
value = features(row, feature);
col = (*col_offset)[feature];
// use view_row instead of (row, feature) to be const
value = view_row(features, row)[feature];
max_col = col + n_cols_feature;
if (value != 0) {
while (row < censoring && col < max_col) {
Expand All @@ -68,14 +73,15 @@ void LongitudinalFeaturesLagger::sparse_lag_preprocessor(ArrayULong &row,
ArrayULong &out_col,
ArrayDouble &out_data,
ulong censoring) const {
// TODO: add checks here ? Or do them in Python ?
ulong j(0), r, c, offset, new_col, max_col;
double value;

for (ulong i = 0; i < data.size(); i++) {
value = data[i];
r = row[i];
c = col[i];
offset = col_offset[c];
offset = (*col_offset)[c];
max_col = offset + (*n_lags)[c] + 1;
new_col = offset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
#include "tick/preprocessing/sparse_longitudinal_features_product.h"
#include <map>

SparseLongitudinalFeaturesProduct::SparseLongitudinalFeaturesProduct(
const SBaseArrayDouble2dPtrList1D &features)
: n_features(features[0]->n_cols()) {}

ulong SparseLongitudinalFeaturesProduct::get_feature_product_col(
ulong col1, ulong col2, ulong n_cols) const {
if (col1 > col2) { // ensure we have the right order as the following formula
Expand Down
38 changes: 27 additions & 11 deletions lib/include/tick/preprocessing/longitudinal_features_lagger.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ class LongitudinalFeaturesLagger {
protected:
ulong n_intervals;
SArrayULongPtr n_lags;
ArrayULong col_offset;
ulong n_samples;
ulong n_observations;
ulong n_features;
ulong n_lagged_features;
SArrayULongPtr col_offset;

public:
LongitudinalFeaturesLagger(const SBaseArrayDouble2dPtrList1D &features,
const SArrayULongPtr n_lags);
// This exists solely for cereal/swig
LongitudinalFeaturesLagger() = default;

void dense_lag_preprocessor(ArrayDouble2d &features, ArrayDouble2d &out,
LongitudinalFeaturesLagger(ulong n_intervals,
SArrayULongPtr n_lags);

void compute_col_offset(SArrayULongPtr n_lags);

void dense_lag_preprocessor(ArrayDouble2d &features,
ArrayDouble2d &out,
ulong censoring) const;

void sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col,
Expand All @@ -34,14 +38,26 @@ class LongitudinalFeaturesLagger {
ulong censoring) const;

template <class Archive>
void serialize(Archive &ar) {
void load(Archive &ar) {
ar(CEREAL_NVP(n_intervals));
ar(CEREAL_NVP(n_features));
ar(CEREAL_NVP(n_lagged_features));

Array<ulong> temp_n_lags, temp_col_offset;
ar(cereal::make_nvp("n_lags", temp_n_lags));

n_lags = temp_n_lags.as_sarray_ptr();
col_offset = temp_col_offset.as_sarray_ptr();
}


template <class Archive>
void save(Archive &ar) const {
ar(CEREAL_NVP(n_intervals));
ar(CEREAL_NVP(n_lags));
ar(CEREAL_NVP(col_offset));
ar(CEREAL_NVP(n_samples));
ar(CEREAL_NVP(n_observations));
ar(CEREAL_NVP(n_features));
ar(CEREAL_NVP(n_lagged_features));
ar(cereal::make_nvp("n_lags", *n_lags));
ar(cereal::make_nvp("col_offset", *col_offset));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ class SparseLongitudinalFeaturesProduct {
ulong n_features;

public:
// This exists soley for cereal/swig
SparseLongitudinalFeaturesProduct() = default;

explicit SparseLongitudinalFeaturesProduct(
const SBaseArrayDouble2dPtrList1D &features);
const ulong n_features): n_features(n_features) {}

inline ulong get_feature_product_col(ulong col1, ulong col2,
ulong n_cols) const;
Expand All @@ -28,7 +31,12 @@ class SparseLongitudinalFeaturesProduct {
ArrayDouble &out_data) const;

template <class Archive>
void serialize(Archive &ar) {
void load(Archive &ar) {
ar(CEREAL_NVP(n_features));
}

template <class Archive>
void save(Archive &ar) const {
ar(CEREAL_NVP(n_features));
}
};
Expand Down
19 changes: 10 additions & 9 deletions lib/swig/preprocessing/longitudinal_features_lagger.i
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,25 @@
#include "tick/preprocessing/longitudinal_features_lagger.h"
%}

%include serialization.i

class LongitudinalFeaturesLagger {

public:
LongitudinalFeaturesLagger(const SBaseArrayDouble2dPtrList1D &features,
const SArrayULongPtr n_lags);
// This exists soley for cereal/swig
LongitudinalFeaturesLagger();

LongitudinalFeaturesLagger(ulong n_intervals,
SArrayULongPtr n_lags);

void dense_lag_preprocessor(ArrayDouble2d &features,
ArrayDouble2d &out,
ulong censoring) const;

void sparse_lag_preprocessor(ArrayULong &row,
ArrayULong &col,
ArrayDouble &data,
ArrayULong &out_row,
ArrayULong &out_col,
ArrayDouble &out_data,
void sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col,
ArrayDouble &data, ArrayULong &out_row,
ArrayULong &out_col, ArrayDouble &out_data,
ulong censoring) const;

};

TICK_MAKE_PICKLABLE(LongitudinalFeaturesLagger);
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
#include "tick/preprocessing/sparse_longitudinal_features_product.h"
%}

%include serialization.i

class SparseLongitudinalFeaturesProduct {

public:
SparseLongitudinalFeaturesProduct(const SBaseArrayDouble2dPtrList1D &features);
// This exists soley for cereal/swig
SparseLongitudinalFeaturesProduct();

SparseLongitudinalFeaturesProduct(const ulong n_features);

void sparse_features_product(ArrayULong &row,
ArrayULong &col,
Expand Down
10 changes: 8 additions & 2 deletions tick/preprocessing/base/longitudinal_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import ABC, abstractmethod
from tick.base import Base
from multiprocessing import cpu_count


class LongitudinalPreprocessor(ABC, Base):
Expand All @@ -14,9 +15,14 @@ class LongitudinalPreprocessor(ABC, Base):
set to the number of cores.
"""

def __init__(self, n_jobs=-1):
_attrinfos = {'n_jobs': {'writable': True}}

def __init__(self, n_jobs=1):
Base.__init__(self)
self.n_jobs = n_jobs
if n_jobs == -1:
self.n_jobs = cpu_count()
else:
self.n_jobs = n_jobs

@abstractmethod
def fit(self, features, labels, censoring) -> None:
Expand Down
Loading

0 comments on commit 94e0ff2

Please sign in to comment.