Pipelines

Posted by Kendra Frederick on Thu 27 December 2018

Building flexible custom pipelines in scikit-learn

Motivation

Some datasets are simple: they're entirely comprised of numeric features that all need to be scaled. Or maybe there are a few Categorical columns and your model doesn't require scaling of the numerical features. Rejoice, for your preprocessing will be easy.

But what if you have a diverse dataset, or your preprocessing needs are more complex, or you want to preprocess your training and test data with a single line of code: this calls for exploiting scikit-learn's many preprocessing and pipeline functions. This blog post examines how (and when) to use Pipeline (or make_pipeline), ColumnTransformer, FunctionTransformer, and FeatureUnion to create custom transformation pipelines.

The Data

The dataset that motivated this deep dive into the flexibility (and potential to confuse) of scikit-learn's customizable pipelines contained the following types of data:

  • Numerical
  • Categorical - two flavors:
    • value could be 1 of several (ex: 'acid', or 'base')
    • value could be 1 or more of several (ex: 'protein, nucleic acid', or 'tissue')
  • Boolean
    • 1 = True, 0 = False
  • Engineered Features
    • mathematical combinations of two or more other columns

In addition, I wanted to test out Logistic Regression, tree-based models (Gradient Boosting Decision Trees), and CatBoost. Each required different preprocessing, but there were transformers in common between them. I opted to build up my pipelines using a "unit" approach. This made the final pipeline creation facile and flexible.

The Players

Pipeline

    from sklearn.pipeline import Pipeline, make_pipeline

Arguably the most commonly used and most familiar to Data Scientists, Pipeline processes data sequentially according to the defined steps. It passes the output of the first step into the second, and so on. The final step can be and often is an estimator (i.e. a model to fit your data against). The make_pipeline offers a less-verbose way of creating a pipeline.

    pipe = Pipeline([('scale', StandardScaler()), ('model', LogisticRegression())])
    same_pipe = make_pipeline(StandardScaler(), LogisticRegression())

ColumnTransformer

In [ ]:
from sklearn.compose import ColumnTransformer

A powerful ally in the quest to create a custom pipeline is ColumnTransformer. You feed it a list of transformers, which themselves are tuples of a user-defined transformer name, the transformer function, and the column indices to transform. Below is an example:

In [ ]:
preprocessor = ColumnTransformer(transformers=[
    ('scaler', StandardScaler(), [2,4,6,8]),
    ('encoder', OneHotEncoder(), [1,3,5])
    ], remainder='passthrough')

This provides the flexibility to apply transformers to a subset of columns in your dataframe. You can specify whether to pass-through or drop the remaining columns. If you only need to transform a subset of your columns, using ColumnTransformer with remainder=passthrough is an efficient way to do so, as shown in the example above.

FeatureUnion

In [ ]:
from sklearn.pipeline import FeatureUnion

FeatureUnion takes a list of transformers as an argument (technically tuples of the format: (<name>, <transformer>)), performs them separately on the data, and concatenates the outputs together. Using the (default) remainder=drop option of ColumnTransformer coupled with FeatureUnion is akin to the split-apply-combine dogma of groupby & apply/aggregate. This can be useful when you want to define transformations piece-by-piece and formulate different combinations of them depending on model requirements.

In [ ]:
preprocessor = FeatureUnion([
        ('some_cols', custom_transformer_01),
        ('other_cols', custom_transformer_02)
        ])

Custom functions and classes

In [ ]:
from sklearn.preprocessing import FunctionTransformer

FunctionTransformer allows us to use an "arbitrary callable", such as a simple function, as a preprocessing transformer. This is useful for stateless transformations, like taking the log of something.

In [ ]:
def double(X):
    return X * 2

preproc_cv = Pipeline([
    ('doubler', FunctionTransformer(double, validate=False))
])

Building the Pipeline Pieces

Now that we've introduced the scikit-learn functions, let's return to our data and begin to construct a pipeline.

In [ ]:
# other functions and packages we'll need to load
from sklearn.preprocessing import StandardScaler, OneHotEncoder 
from sklearn.feature_extraction.text import CountVectorizer

Categorical Variables with >1 possible value

For the categorical variables that could take on more than one value, I use the CountVectorize function. (See this blog post for details on how this function is used as a preprocessor for this data.) One column, target_classes, first needed to have missing values imputed. I create a custom class for this, which returns only the column of interest (this is analogous to setting remainder=drop).

In [ ]:
# custom imputation class
class Imputer(object):
    '''Fills in missing (na) values in 'col_name' column with 'imp_val' '''
    def __init__(self, col_name, imp_val=''):
        self.col_name = col_name
        self.imp_val = imp_val

    def fit(self, X, y=None):
        return self

    def transform(self, X, *args):
        return X[self.col_name].fillna(value=self.imp_val)

# define transformer function: CountVectorize with a custom token_pattern
target_transformer = CountVectorizer(token_pattern=r'([\w*-]{1,}),*')

# define preprocessor 
preproc_target = Pipeline([
        ('impute', Imputer('target_classes')),
        ('target_cv', target_transformer)
])

The other column, assay_types, can be Count-Vectorized directly with a custom token pattern. Note that the CountVectorize function requires that the column indices be provided as a scalar.

In [ ]:
# define the column indices to transform
asy_cls_col = list(X_train.columns).index('assay_types')

# define transformer function: CountVectorize with a custom token_pattern
assay_transformer = CountVectorizer(lowercase=False, token_pattern=r'\[*(\w{1}),*\]*', )

# define preprocessor
preproc_assay = ColumnTransformer(transformers=[
            ('assay_cv', assay_transformer, asy_cls_col)
])

Next, I sew these two steps together into one preprocessor (named 'cv' for CountVectorize) using Pipeline and FeatureUnion. Recall that: Pipeline processes data sequentially, passing the output of the first step into the second, and so on, and FeatureUnion performs the transformations separately, then concatenates the results together.

Note that the output of CountVectorize is a sparse (CSR) matrix; some of the models I plan to evaluate require a dense matrix, so I convert the output to an ndarray using a user-defined function, densify, and FunctionTransformer.

In [ ]:
def densify(X):
    return X.toarray()

preproc_cv = Pipeline([
    ('cv', FeatureUnion([
        ('assays', preproc_assay),
        ('targets', preproc_target)
        ])),
    ('densify', FunctionTransformer(densify, validate=False))
])

Categorical variables with only 1 possible value

More typically, categorical variables can only take on one value, and thus we can use scikit-learn's OneHotEncode function to encode this information. It's worth noting that the newer algorithm CatBoost does not require encoding of single-value categorical variables prior to feeding the data into the model.

I perform this preprocessing step using ColumnTransformer so I can specify which columns to encode. Note that in contrast to CountVectorize above, OneHotEncode requires that columns indices be specified in list format.

In [ ]:
# define categorical columns to encode & make a list of their indices
cols_to_encode = ['molecular_species']
cols_to_encode_idx = [list(X_train.columns).index(x) for x in cols_to_encode]

# define preprocessor
preproc_ohe = ColumnTransformer(transformers=[
    ('cat', OneHotEncoder(), cols_to_encode_idx)])

Numeric features

Logistic Regression requires (sometimes -- see Footnote 1) numerical data to be scaled. I again use ColumnTransformer to specify which columns to transform with StandardScaler.

In [ ]:
# define columns to scale & make a list of their indices.
cols_to_scale = ['mw_freebase', 'alogp', 'acd_logp', 'acd_logd', 'hba', 'hbd', 'psa', 'rtb', 
       'num_ro5_violations', 'aromatic_rings', 'heavy_atoms', 'hba_lipinski', 'hbd_lipinski',  
       'num_target_organisms', 'num_alerts_set1']
cols_to_scale_idx = [list(X_train.columns).index(x) for x in cols_to_scale]

# apply preprocessor
preproc_scale = ColumnTransformer(transformers=[
        ('num', StandardScaler(), cols_to_scale_idx), 
    ], remainder='drop')    

Features to leave un-touched

These features are already Boolean-encoded, where 1 = True and 0 = False. They will need to be identified as categorical features in CatBoost. I again use ColumnTransformer and instead of specifying a transformer, I use the special string passthrough to indicate that this is what I want done with these columns.

In [ ]:
# define columns and a list of their indices
cols_to_pass = ['ro3_pass', 'research_co','human_target']
cols_to_pass_idx = [list(X_train.columns).index(x) for x in cols_to_pass]

# define preprocessor
preproc_pass = ColumnTransformer(transformers=[
        ('as_is', 'passthrough', cols_to_pass_idx), 
])

# for tree-based models, also want to pass through numeric columns
preproc_pass_num = ColumnTransformer(transformers=[
        ('as_is', 'passthrough', cols_to_scale_idx), 
])

Engineered Features

While creating engineered features based on mathematical combinations of other columns could be done directly on the dataframe, I am in the habit of splitting off my test set immediately, and the decision to engineer features often comes after EDA (exploratory data analysis). Thus, I wanted to include feature engineering in my pipeline, so my test set (and any new data) could be transformed as part of the same preprocessing pipeline.

I define custom classes to perform these transformations: an average and a ratio. There is the possibility that in my ratio calculation the denominator is 0, meaning the ratio is undefined. I use fillna(0) to combat this. You'll notice that I essentially compute the average twice: once in the TargetActivityAvg class transform, and again in the AssayRatio class transform. Try as I might, I couldn't contrive a pipeline that would enable me to pass both the calculated average and the existing dataframe column num_assays required for the ratio calculation. As the average calculation is simple enough, I repeat it. But a smarter person can probably figure out a way not to.

Note these transformers take the entire dataframe as input and return a single pandas Series as output. This needs to be converted to a numpy array before combing back together with the rest of the transformed features, hence the usage of the simple function reshaper and FunctionTransformer.

In [ ]:
# define custom classes & function
class TargetActivityAvg(object):
    '''Returns average of num_activities & num_targets columns.'''
    def __init__(self, *args):
        self.args = args

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return (X['num_activities'] + X['num_targets']) / 2


class AssayRatio(object):
    '''Returns ratio of num_assays / avg(num_targets & num_activities).'''
    def __init__(self, *args):
        self.args = args

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        avg = (X['num_activities'] + X['num_targets']) / 2
        ratio =  X['num_assays'] / avg
        ratio = ratio.fillna(0)
        return ratio

def reshaper(X):
    return X.values.reshape(-1,1)

# define feature engineering preprocessors
feat_eng1 = Pipeline([
    ('avg', TargetActivityAvg()),
    ('reshape', FunctionTransformer(reshaper, validate=False))
])

feat_eng2 = Pipeline([
        ('ratio',  AssayRatio()),
        ('reshape', FunctionTransformer(reshaper, validate=False)),
        ])

# combine them into one preprocessor
preproc_feat_eng = Pipeline([
    ('feat_eng', FeatureUnion([
        ('avg', feat_eng1),
        ('ratio', feat_eng2)
    ]))
])

# need to scale these engineered features for Logistic Regression
preproc_feat_eng_scaled = Pipeline([
    ('feat_eng', FeatureUnion([
        ('avg', feat_eng1),
        ('ratio', feat_eng2)
    ])),
    ('scale', StandardScaler())
])

Putting them all together

The final step is to combine the various preprocessors together into a single pipeline. Because the transformer steps were defined separately, they can be combined together in different ways, depending on the downstram application (model).

Scaling: This pipeline scales the numerical features, e.g. for Logistic Regression.

In [ ]:
pipe_with_scale = Pipeline([
    ('all', FeatureUnion([
        ('cvs', preproc_cv),
        ('feat_eng', preproc_feat_eng_scaled),
        ('ohe', preproc_ohe), 
        ('pass', preproc_pass),
        ('num', preproc_scale),
    ])
    )
])

Categorical only: This pipeline is for models that don't require scaling, e.g. Gradient Boosting Decision Trees.

In [ ]:
pipe_cat_only = Pipeline([
    ('all', FeatureUnion([
        ('cvs', preproc_cv),
        ('feat_eng', preproc_feat_eng),
        ('ohe', preproc_ohe), 
        ('pass', preproc_pass),
        ('num', preproc_pass_num)
    ])
    )
])

CatBoost: As described above, CatBoost has different preprocessing needs (no One-Hot Encoding). The last bit of code below gives the columns indices for the categorical features, which need to be provided to the CatBoost algorithm. Note that I had to determine these by hand -- I couldn't figure out a way to do it automatically.

In [ ]:
# define columns that do not require any preprocessing
other_cb_features = ['mw_freebase', 'alogp', 'acd_logp', 'acd_logd', 'hba', 'hbd', 'psa', 'rtb', 'ro3_pass',
       'num_ro5_violations', 'molecular_species', 'aromatic_rings', 'heavy_atoms', 'qed_weighted',
       'hba_lipinski', 'hbd_lipinski', 'research_co', 'num_target_organisms',
        'human_target', 'num_target_organisms', 'num_alerts_set1']
other_cb_feat_idx = [list(X_train.columns).index(x) for x in other_cb_features]

# create a transformer to pass them through
preproc_pass_cb = ColumnTransformer(transformers=[
        ('as_is', 'passthrough', other_cb_feat_idx), 
])

# full pipeline
pipe_cat_boost = Pipeline([
    ('all', FeatureUnion([
        ('cvs', preproc_cv),
        ('feat_eng', preproc_feat_eng),
        ('pass', preproc_pass_cb)
    ])
    )
])

# where the "categorical" columns end up after the above pre-processing
cab_cat_feats = ['ro3_pass', 'molecular_species', 'research_co', 'human_target']
cb_cat_cols = [20, 22, 28, 30]

Pipeline in action

Here is a brief example of how to use one of the preprocessors (pipe_with_scale) in combination with a model:

In [ ]:
from sklearn.linear_model import LogisticRegression

log_reg_pipe = make_pipeline(pipe_with_scale, LogisticRegression())
log_reg_pipe.fit(X_train, y_train)
y_pred = log_reg_pipe.predict(X_test)

Aside: Extracting feature names

Often, after fitting your model, you want to extract feature importances. Which features are impacting your model greatly? Which have little to no impact (and thus could be dropped)? The below code keeps track of and extracts the final order of features in the various pipelines.

In [ ]:
preproc_assay.fit(X_train)
feat_cv_asy = preproc_assay.named_transformers_['assay_cv'].get_feature_names()

preproc_target.fit(X_train)
feat_cv_trg = preproc_target.named_steps['target_cv'].get_feature_names()

feat_cv_asy = ['assay_class_' + x for x in feat_cv_asy]
feat_cv_trg = ['target_class_' + x for x in feat_cv_trg]

preproc_ohe.fit(X_train)
feat_ohe = preproc_ohe.named_transformers_['cat'].get_feature_names()
feat_ohe = feat_ohe.tolist()

feat_num = cols_to_scale

feat_pass = cols_to_pass

feat_fe = ['avg_num_targ_act', 'ratio_assay_avg_targ_act']

# Feature names for pipe_cat_only & pipe_with_scale
feat_names = feat_cv_asy + feat_cv_trg + feat_fe + feat_ohe + feat_pass + feat_num

# for pipe_cat_boost:
feat_other_cb = other_cb_features
feat_names_cb = feat_cv_asy + feat_cv_trg + feat_fe + feat_other_cb

Summary

In summary, we've created 3 different pipelines, built the same "units" or base transformers. Creating small units separately and subsequently combining them in different ways allows for flexibility and efficiency. Defining custom transformer classes and functions expands the functionality of scikit-learn's builtin capabilities. ColumnTransformer, FeatureUnion, FunctionTransformer, and of course, Pipeline are powerful tools in developing custom preprocessors.

Footnotes:

  1. Apparently, and I was unaware of this before I began writing this blog post, certain Logistic Regression solvers are robust to unscaled data, including the one I used ('liblinear'). See the summary table at https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression