Distributed SciKit Learn for CPU-bound Problems ¶
This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.
This video talks demonstrates the same example on a larger cluster.
from IPython.display import HTML
HTML("""<iframe width="560" height="315" src="https://www.youtube.com/embed/5Zf6DQaf7jk" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen> </iframe>""")
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
Distributed Training ¶
Scikit-learn uses
joblib
for single-machine parallelism. This lets you train most estimators (anything that accepts an
n_jobs
parameter) using all the cores of your laptop or workstation.
Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster , by changing one line of code.
This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).
import dask_ml.joblib # register the distriubted backend
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
We'll define a small pipeline that combines text feature extraction with a simple classifier.
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
Grid search over some parameters.
parameters = {
'vect__max_df': (0.5, 0.75, 1.0),
#'vect__max_features': (None, 5000, 10000, 50000),
'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams
#'tfidf__use_idf': (True, False),
#'tfidf__norm': ('l1', 'l2'),
# 'clf__alpha': (0.00001, 0.000001),
# 'clf__penalty': ('l2', 'elasticnet'),
#'clf__n_iter': (10, 50, 80),
}
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)
To fit this normally, we would write
grid_search.fit(data.data, data.target)
That would use the default joblib backend (multiple processes) for parallelism.
To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a
parallel_backend
context.
from sklearn.externals import joblib
with joblib.parallel_backend('dask', scatter=[data.data, data.target]):
grid_search.fit(data.data, data.target)
If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.
Parallel, Distributed Prediction ¶
Sometimes, you're train on a small dataset, but need to predict for a much larger batch of data.
In this case, you'd like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction.
dask_ml.wrappers.ParallelPostFit
provides exactly that. It's a meta-estimator. It does nothing during training; the underlying estimator (probably a scikit-learn estimator) will probably be in-memory on a single machine. But tasks like
predict
,
score
, etc. are parallelized and distributed.
Most of the time, using
ParallelPostFit
is as simple as wrapping the original estimator.
When used inside a GridSearch, you'll need to update the keys of the parameters, just like with any meta-estimator.
The only complication comes when using
ParallelPostFit
with another meta-estimator like
GridSearchCV
. In this case, you'll need to prefix your parameter names with
estimator__
.
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit
We'll load the small NumPy arrays for training.
X, y = load_digits(return_X_y=True)
X.shape
svc = ParallelPostFit(SVC(random_state=0))
param_grid = {
# use estimator__param instead of param
'estimator__C': [0.01, 1.0, 10],
}
grid_search = GridSearchCV(svc, param_grid, iid=False)
And fit as usual.
grid_search.fit(X, y)
We'll simulate a large dask array by replicating the training data a few times. In reality, you would load this from your file system.
import dask.array as da
big_X = da.concatenate([
da.from_array(X, chunks=X.shape)
for _ in range(10)
])
big_X
Operations like
predict
, or
predict_proba
return dask, rather than NumPy arrays.
When you compute, the work will be done in parallel, out of core or distributed on the cluster.
predicted = grid_search.predict(big_X)
predicted
At this point predicted could be written to disk, or aggregated before returning to the client.