Distributed SciKit Learn for CPU-bound Problems

This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.

This video talks demonstrates the same example on a larger cluster.

In [1]:
from IPython.display import HTML

HTML("""<iframe width="560" height="315" src="https://www.youtube.com/embed/5Zf6DQaf7jk" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen> </iframe>""")
Out[1]:
In [2]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
Out[2]:

Client

Cluster

  • Workers: 1
  • Cores: 4
  • Memory: 2.00 GB

Distributed Training

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster , by changing one line of code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).

In [3]:
import dask_ml.joblib  # register the distriubted backend

from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
In [4]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
Downloading 20news dataset. This may take a few minutes.
INFO:sklearn.datasets.twenty_newsgroups:Downloading 20news dataset. This may take a few minutes.
Downloading dataset from https://ndownloader.figshare.com/files/5975967 (14 MB)
INFO:sklearn.datasets.twenty_newsgroups:Downloading dataset from https://ndownloader.figshare.com/files/5975967 (14 MB)
Loading 20 newsgroups dataset for categories:
['alt.atheism', 'talk.religion.misc']
857 documents
2 categories

We'll define a small pipeline that combines text feature extraction with a simple classifier.

In [5]:
pipeline = Pipeline([
    ('vect', CountVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

Grid search over some parameters.

In [6]:
parameters = {
    'vect__max_df': (0.5, 0.75, 1.0),
    #'vect__max_features': (None, 5000, 10000, 50000),
    'vect__ngram_range': ((1, 1), (1, 2)),  # unigrams or bigrams
    #'tfidf__use_idf': (True, False),
    #'tfidf__norm': ('l1', 'l2'),
    # 'clf__alpha': (0.00001, 0.000001),
    # 'clf__penalty': ('l2', 'elasticnet'),
    #'clf__n_iter': (10, 50, 80),
}
In [7]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)

To fit this normally, we would write

grid_search.fit(data.data, data.target)

That would use the default joblib backend (multiple processes) for parallelism. To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a parallel_backend context.

In [8]:
from sklearn.externals import joblib
In [9]:
with joblib.parallel_backend('dask', scatter=[data.data, data.target]):
    grid_search.fit(data.data, data.target)
Fitting 3 folds for each of 6 candidates, totalling 18 fits
[Parallel(n_jobs=-1)]: Done  18 out of  18 | elapsed:   31.2s finished

If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.

Parallel, Distributed Prediction

Sometimes, you're train on a small dataset, but need to predict for a much larger batch of data. In this case, you'd like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction. dask_ml.wrappers.ParallelPostFit provides exactly that. It's a meta-estimator. It does nothing during training; the underlying estimator (probably a scikit-learn estimator) will probably be in-memory on a single machine. But tasks like predict , score , etc. are parallelized and distributed.

Most of the time, using ParallelPostFit is as simple as wrapping the original estimator. When used inside a GridSearch, you'll need to update the keys of the parameters, just like with any meta-estimator. The only complication comes when using ParallelPostFit with another meta-estimator like GridSearchCV . In this case, you'll need to prefix your parameter names with estimator__ .

In [10]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit

We'll load the small NumPy arrays for training.

In [11]:
X, y = load_digits(return_X_y=True)
X.shape
Out[11]:
(1797, 64)
In [12]:
svc = ParallelPostFit(SVC(random_state=0))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],
}

grid_search = GridSearchCV(svc, param_grid, iid=False)

And fit as usual.

In [13]:
grid_search.fit(X, y)
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.394586
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.408134
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.398578
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.447461
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.454904
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.432924
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.428401
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.427184
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.429633
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Starting fit
INFO:dask_ml.wrappers:Finished fit in 0:00:00.962914
Out[13]:
GridSearchCV(cv=None, error_score='raise',
       estimator=ParallelPostFit(estimator=SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='auto', kernel='rbf',
  max_iter=-1, probability=False, random_state=0, shrinking=True,
  tol=0.001, verbose=False),
        scoring=None),
       fit_params=None, iid=False, n_jobs=1,
       param_grid={'estimator__C': [0.01, 1.0, 10]},
       pre_dispatch='2*n_jobs', refit=True, return_train_score='warn',
       scoring=None, verbose=0)

We'll simulate a large dask array by replicating the training data a few times. In reality, you would load this from your file system.

In [14]:
import dask.array as da
In [15]:
big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)
])
big_X
Out[15]:
dask.array<concatenate, shape=(17970, 64), dtype=float64, chunksize=(1797, 64)>

Operations like predict , or predict_proba return dask, rather than NumPy arrays. When you compute, the work will be done in parallel, out of core or distributed on the cluster.

In [16]:
predicted = grid_search.predict(big_X)
predicted
Out[16]:
dask.array<predict, shape=(17970,), dtype=int64, chunksize=(1797,)>

At this point predicted could be written to disk, or aggregated before returning to the client.


Right click to download this notebook from GitHub.