Machine Learning

This is a high-level overview demonstrating some the components of Dask-ML. For more details on each individual component, check out the links at the end of this notebook.

In [1]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')
client
Out[1]:

Client

Cluster

  • Workers: 1
  • Cores: 4
  • Memory: 2.00 GB

Distributed Training

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Dask registers a joblib backend. This lets you train those estimators using all the cores of your cluster , by changing one line of code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).

In [2]:
import dask_ml.joblib  # register the distriubted backend
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd

We'll use scikit-learn to create a pair of small random arrays, one for the features X , and one for the target y .

In [3]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]
Out[3]:
array([[-1.06377997,  0.67640868,  1.06935647, -0.21758002,  0.46021477,
        -0.39916689, -0.07918751,  1.20938491, -0.78531472, -0.17218611,
        -1.08535744, -0.99311895,  0.30693511,  0.06405769, -1.0542328 ,
        -0.52749607, -0.0741832 , -0.35562842,  1.05721416, -0.90259159],
       [ 0.0708476 , -1.69528125,  2.44944917, -0.5304942 , -0.93296221,
         2.86520354,  2.43572851, -1.61850016,  1.30071691,  0.34840246,
         0.54493439,  0.22532411,  0.60556322, -0.19210097, -0.06802699,
         0.9716812 , -1.79204799,  0.01708348, -0.37566904, -0.62323644],
       [ 0.94028404, -0.49214582,  0.67795602, -0.22775445,  1.40175261,
         1.23165333, -0.77746425,  0.01561602,  1.33171299,  1.08477266,
        -0.97805157, -0.05012039,  0.94838552, -0.17342825, -0.47767184,
         0.76089649,  1.00115812, -0.06946407,  1.35904607, -1.18958963],
       [-0.29951677,  0.75988955,  0.18280267, -1.55023271,  0.33821802,
         0.36324148, -2.10052547, -0.4380675 , -0.16639343, -0.34083531,
         0.42435643,  1.17872434,  2.8314804 ,  0.14241375, -0.20281911,
         2.40571546,  0.31330473,  0.40435568, -0.28754632, -2.8478034 ],
       [-2.63062675,  0.23103376,  0.04246253,  0.47885055,  1.54674163,
         1.6379556 , -1.53207229, -0.73444479,  0.46585484,  0.4738362 ,
         0.98981401, -1.06119392, -0.88887952,  1.23840892, -0.57282854,
        -1.27533949,  1.0030065 , -0.47712843,  0.09853558,  0.52780407]])

We'll fit a Support Vector Classifier , using grid search to find the best value of the $C$ hyperparameter.

In [4]:
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           n_jobs=-1)

To fit that normally, we'd call

grid_search.fit(X, y)

To fit it using the cluster, we just need to use a context manager provided by joblib. We'll pre-scatter the data to each worker, which can help with performance.

In [5]:
from sklearn.externals import joblib

with joblib.parallel_backend('dask', scatter=[X, y]):
    grid_search.fit(X, y)

We fit 48 different models, one for each hyper-parameter combination in param_grid , distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

In [6]:
pd.DataFrame(grid_search.cv_results_).head()
Out[6]:
mean_fit_time std_fit_time mean_score_time std_score_time param_C param_kernel param_shrinking params split0_test_score split1_test_score split2_test_score mean_test_score std_test_score rank_test_score
0 0.328005 0.063314 0.027563 0.003866 0.001 rbf True {'C': 0.001, 'kernel': 'rbf', 'shrinking': True} 0.502994 0.501502 0.501502 0.502 0.000704 41
1 0.350209 0.060638 0.018452 0.009711 0.001 rbf False {'C': 0.001, 'kernel': 'rbf', 'shrinking': False} 0.502994 0.501502 0.501502 0.502 0.000704 41
2 0.193392 0.070645 0.024401 0.010793 0.001 poly True {'C': 0.001, 'kernel': 'poly', 'shrinking': True} 0.502994 0.501502 0.501502 0.502 0.000704 41
3 0.240595 0.066333 0.026387 0.003415 0.001 poly False {'C': 0.001, 'kernel': 'poly', 'shrinking': Fa... 0.502994 0.501502 0.501502 0.502 0.000704 41
4 0.416561 0.075020 0.044301 0.005469 0.001 sigmoid True {'C': 0.001, 'kernel': 'sigmoid', 'shrinking':... 0.502994 0.501502 0.501502 0.502 0.000704 41
In [7]:
grid_search.predict(X)[:5]
Out[7]:
array([0, 1, 1, 1, 0])
In [8]:
grid_search.score(X, y)
Out[8]:
0.972

For more on training scikit-learn models with distributed joblib, see the dask-ml documentation .

Training on Large Datasets

Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.

All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a dask array or dataframe .

In [9]:
%matplotlib inline
In [10]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In this example, we'll use dask_ml.datasets.make_blobs to generate some random dask arrays.

In [11]:
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X
Out[11]:
dask.array<concatenate, shape=(10000000, 2), dtype=float64, chunksize=(1000000, 2)>

We'll use the k-means implemented in Dask-ML to cluster the points. It uses the k-means|| (read: "k-means parallel") initialization algorithm, which scales better than k-means++ . All of the computation, both during and after initialization, can be done in parallel.

In [12]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)
INFO:root:Starting _check_array
INFO:root:Finished _check_array in 0:00:02.673101
INFO:root:Starting init_scalable
INFO:dask_ml.cluster.k_means:Initializing with k-means||
INFO:dask_ml.cluster.k_means:Starting init iteration  1/ 2 ,  1 centers
INFO:dask_ml.cluster.k_means:Finished init iteration  1/ 2 ,  1 centers in 0:00:01.524919
INFO:dask_ml.cluster.k_means:Starting init iteration  2/ 2 ,  9 centers
INFO:dask_ml.cluster.k_means:Finished init iteration  2/ 2 ,  9 centers in 0:00:01.948692
INFO:root:Finished init_scalable in 0:00:04.231772
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  0.
INFO:dask_ml.cluster.k_means:Shift: 0.8586
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  0. in 0:00:01.348988
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  1.
INFO:dask_ml.cluster.k_means:Shift: 0.1032
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  1. in 0:00:01.210414
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  2.
INFO:dask_ml.cluster.k_means:Shift: 0.0194
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  2. in 0:00:01.169763
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  3.
INFO:dask_ml.cluster.k_means:Shift: 0.0037
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  3. in 0:00:01.033845
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  4.
INFO:dask_ml.cluster.k_means:Shift: 0.0007
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  4. in 0:00:00.866419
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  5.
INFO:dask_ml.cluster.k_means:Shift: 0.0001
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  5. in 0:00:01.212563
INFO:dask_ml.cluster.k_means:Starting Lloyd loop  6.
INFO:dask_ml.cluster.k_means:Shift: 0.0000
INFO:dask_ml.cluster.k_means:Finished Lloyd loop  6. in 0:00:01.275980
Out[12]:
KMeans(algorithm='full', copy_x=True, init='k-means||', init_max_iter=2,
    max_iter=300, n_clusters=3, n_jobs=1, oversampling_factor=10,
    precompute_distances='auto', random_state=None, tol=0.0001)

We'll plot a sample of points, colored by the cluster each falls into.

In [13]:
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
           cmap='viridis', alpha=0.25);

For all the estimators implemented in Dask-ML, see the API documentation .