Machine Learning ¶
This is a high-level overview demonstrating some the components of Dask-ML. For more details on each individual component, check out the links at the end of this notebook.
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
Distributed Training ¶
Scikit-learn uses
joblib
for single-machine parallelism. This lets you train most estimators (anything that accepts an
n_jobs
parameter) using all the cores of your laptop or workstation.
Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster , by changing one line of code.
This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).
import dask_ml.joblib # register the distriubted backend
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd
We'll use scikit-learn to create a pair of small random arrays, one for the features
X
, and one for the target
y
.
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]
We'll fit a Support Vector Classifier , using grid search to find the best value of the $C$ hyperparameter.
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
"kernel": ['rbf', 'poly', 'sigmoid'],
"shrinking": [True, False]}
grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
param_grid=param_grid,
return_train_score=False,
iid=True,
n_jobs=-1)
To fit that normally, we'd call
grid_search.fit(X, y)
To fit it using the cluster, we just need to use a context manager provided by joblib. We'll pre-scatter the data to each worker, which can help with performance.
from sklearn.externals import joblib
with joblib.parallel_backend('dask', scatter=[X, y]):
grid_search.fit(X, y)
We fit 48 different models, one for each hyper-parameter combination in
param_grid
, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.
pd.DataFrame(grid_search.cv_results_).head()
grid_search.predict(X)[:5]
grid_search.score(X, y)
For more on training scikit-learn models with distributed joblib, see the dask-ml documentation .
Training on Large Datasets ¶
Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.
All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a dask array or dataframe .
%matplotlib inline
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt
In this example, we'll use
dask_ml.datasets.make_blobs
to generate some random
dask
arrays.
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
chunks=1000000,
random_state=0,
centers=3)
X = X.persist()
X
We'll use the k-means implemented in Dask-ML to cluster the points. It uses the
k-means||
(read: "k-means parallel") initialization algorithm, which scales better than
k-means++
. All of the computation, both during and after initialization, can be done in parallel.
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)
We'll plot a sample of points, colored by the cluster each falls into.
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
cmap='viridis', alpha=0.25);
For all the estimators implemented in Dask-ML, see the API documentation .
This was a high-level overview. More details are available for particular topics: