00 Introduction

Dask dataframes are blocked Pandas dataframes

Dask Dataframes coordinate many Pandas dataframes, partitioned along an index. They support a large subset of the Pandas API.

Start Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
Out[1]:

Client

Cluster

  • Workers: 2
  • Cores: 4
  • Memory: 2.00 GB

Create Random Dataframe

We create a random timeseries of data with the following attributes:

  1. It stores a record for every 10 seconds of the year 2000
  2. It splits that year by month, keeping every month as a separate Pandas dataframe
  3. Along with a datetime index it has columns for names, ids, and numeric values

This is a small dataset of about 240 MB. Increase the number of days or reduce the frequency to practice with a larger dataset.

In [2]:
import dask.dataframe as dd
df = dd.demo.make_timeseries('2000-01-01', '2000-12-31', freq='10s', partition_freq='1M',
                             dtypes={'name': str, 'id': int, 'x': float, 'y': float})

Unlike Pandas, Dask DataFrames are lazy and so no data is printed here.

In [3]:
df
Out[3]:
Dask DataFrame Structure:
id name x y
npartitions=11
2000-01-31 int64 object float64 float64
2000-02-29 ... ... ... ...
... ... ... ... ...
2000-11-30 ... ... ... ...
2000-12-31 ... ... ... ...
Dask Name: make-timeseries, 11 tasks

But the column names and dtypes are known.

In [4]:
df.dtypes
Out[4]:
id        int64
name     object
x       float64
y       float64
dtype: object

Some operations will automatically display the data.

In [5]:
df.head(3)
Out[5]:
id name x y
timestamp
2000-01-31 00:00:00 997 Kevin 0.149327 0.806380
2000-01-31 00:00:10 1010 Jerry -0.298290 0.196082
2000-01-31 00:00:20 987 Ingrid 0.896296 0.539178

Use Standard Pandas Operations

Most common Pandas operations operate identically on Dask dataframes

In [6]:
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3
Out[6]:
Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 60 tasks

Call .compute() when you want your result as a Pandas dataframe.

If you started Client() above then you may want to watch the status page during computation.

In [7]:
computed_df = df3.compute()
type(computed_df)
Out[7]:
pandas.core.series.Series
In [8]:
computed_df
Out[8]:
name
Alice       0.577226
Bob         0.577258
Charlie     0.576698
Dan         0.575275
Edith       0.577405
Frank       0.576786
George      0.576826
Hannah      0.575691
Ingrid      0.576622
Jerry       0.576228
Kevin       0.578563
Laura       0.577117
Michael     0.577383
Norbert     0.575995
Oliver      0.577614
Patricia    0.577395
Quinn       0.578067
Ray         0.576894
Sarah       0.577917
Tim         0.577076
Ursula      0.576883
Victor      0.576931
Wendy       0.580264
Xavier      0.577487
Yvonne      0.575248
Zelda       0.576178
Name: x, dtype: float64

Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.

This allows future computations to be much faster.

In [9]:
df = df.persist()

Time Series Operations

Because we have a datetime index time-series operations work efficiently

In [10]:
%matplotlib inline
In [11]:
df[['x', 'y']].resample('1w').mean().head()
Out[11]:
x y
2000-02-06 -0.000913 0.000791
2000-02-13 0.000502 0.000116
2000-02-20 -0.000879 -0.000217
2000-02-27 0.002259 0.002043
In [12]:
df[['x', 'y']].resample('1w').mean().compute().plot()
Out[12]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f635a4ca860>
In [13]:
df[['x', 'y']].rolling(window='7d').mean().head()
Out[13]:
x y
timestamp
2000-01-31 00:00:00 0.149327 0.806380
2000-01-31 00:00:10 -0.074481 0.501231
2000-01-31 00:00:20 0.249111 0.513880
2000-01-31 00:00:30 -0.002103 0.368822
2000-01-31 00:00:40 -0.183664 0.448031

Random access is cheap along the index, but must still be computed.

In [14]:
df.loc['2000-05-05']
Out[14]:
Dask DataFrame Structure:
id name x y
npartitions=1
2000-05-05 00:00:00.000000000 int64 object float64 float64
2000-05-05 23:59:59.999999999 ... ... ... ...
Dask Name: loc, 12 tasks
In [15]:
%time df.loc['2000-05-05'].compute()
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 27.8 ms
Out[15]:
id name x y
timestamp
2000-05-05 00:00:00 1015 Wendy -0.701126 -0.416353
2000-05-05 00:00:10 1043 Norbert 0.886574 -0.692488
2000-05-05 00:00:20 1033 Alice 0.886792 0.141713
2000-05-05 00:00:30 1029 Victor -0.305771 -0.748017
2000-05-05 00:00:40 940 Yvonne -0.175538 0.134038
2000-05-05 00:00:50 1055 Ingrid 0.087111 -0.527567
2000-05-05 00:01:00 1020 Bob 0.681037 -0.089672
2000-05-05 00:01:10 1078 Oliver 0.334097 0.849234
2000-05-05 00:01:20 1044 Edith -0.683929 0.385465
2000-05-05 00:01:30 1031 Kevin 0.737251 -0.562496
2000-05-05 00:01:40 1071 Sarah -0.555398 -0.331887
2000-05-05 00:01:50 1001 Laura 0.816121 -0.503769
2000-05-05 00:02:00 1017 Yvonne -0.110614 0.959023
2000-05-05 00:02:10 1022 Zelda -0.838848 -0.209191
2000-05-05 00:02:20 1013 Laura 0.661130 0.687834
2000-05-05 00:02:30 1036 Laura -0.178334 -0.765293
2000-05-05 00:02:40 1015 Kevin 0.297924 0.141988
2000-05-05 00:02:50 962 Xavier -0.452069 0.851751
2000-05-05 00:03:00 977 Bob 0.551147 -0.796333
2000-05-05 00:03:10 1012 Bob -0.948354 0.329697
2000-05-05 00:03:20 931 Charlie -0.027186 0.710329
2000-05-05 00:03:30 973 Patricia -0.452406 -0.760441
2000-05-05 00:03:40 1019 Oliver -0.373018 -0.481146
2000-05-05 00:03:50 993 Tim -0.601610 0.348938
2000-05-05 00:04:00 1069 Jerry -0.498911 -0.478276
2000-05-05 00:04:10 962 Alice 0.836665 0.267224
2000-05-05 00:04:20 983 Yvonne 0.851664 -0.883239
2000-05-05 00:04:30 1016 Laura 0.650123 0.431913
2000-05-05 00:04:40 999 Zelda -0.299906 -0.509355
2000-05-05 00:04:50 1083 Dan -0.287515 0.645344
... ... ... ... ...
2000-05-05 23:55:00 968 Kevin -0.886977 0.282120
2000-05-05 23:55:10 1010 Zelda 0.548216 0.471020
2000-05-05 23:55:20 1009 Edith 0.034130 0.237833
2000-05-05 23:55:30 1028 Zelda 0.868942 -0.769357
2000-05-05 23:55:40 959 Ingrid 0.713743 -0.272613
2000-05-05 23:55:50 1004 George -0.751862 0.049266
2000-05-05 23:56:00 957 Ursula -0.807664 0.874035
2000-05-05 23:56:10 937 Patricia 0.837204 -0.310625
2000-05-05 23:56:20 1005 Yvonne 0.528313 0.254505
2000-05-05 23:56:30 1037 Quinn -0.724144 -0.661896
2000-05-05 23:56:40 1023 Alice -0.018466 0.367731
2000-05-05 23:56:50 1012 Xavier -0.619219 0.458211
2000-05-05 23:57:00 1025 Ursula 0.490855 0.086750
2000-05-05 23:57:10 1014 Kevin -0.036094 0.069899
2000-05-05 23:57:20 967 Wendy 0.811907 -0.764313
2000-05-05 23:57:30 981 Quinn 0.777364 -0.079358
2000-05-05 23:57:40 1070 Norbert -0.999021 -0.569842
2000-05-05 23:57:50 933 Sarah 0.466534 0.071201
2000-05-05 23:58:00 965 Oliver 0.071813 -0.852543
2000-05-05 23:58:10 949 Hannah -0.162235 -0.960035
2000-05-05 23:58:20 951 Edith 0.578089 -0.566100
2000-05-05 23:58:30 995 Alice -0.594776 0.953540
2000-05-05 23:58:40 1044 Charlie -0.239722 -0.437055
2000-05-05 23:58:50 953 Edith -0.605910 0.972862
2000-05-05 23:59:00 976 Sarah -0.734096 -0.208681
2000-05-05 23:59:10 956 Patricia -0.179072 -0.574324
2000-05-05 23:59:20 1032 Zelda 0.773153 -0.083345
2000-05-05 23:59:30 1044 Charlie -0.842552 0.461778
2000-05-05 23:59:40 967 Victor -0.717027 0.421935
2000-05-05 23:59:50 966 Yvonne -0.185876 0.941718

8640 rows × 4 columns

Set Index

Data is sorted by the index column. This allows for faster access, joins, groupby-apply operations, etc.. However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently.

In [16]:
df = df.set_index('name')
df
Out[16]:
Dask DataFrame Structure:
id x y
npartitions=11
Alice int64 float64 float64
Alice ... ... ...
... ... ... ...
Xavier ... ... ...
Zelda ... ... ...
Dask Name: sort_index, 231 tasks

Because computing this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory.

In [17]:
df = df.persist()

Dask now knows where all data lives, indexed cleanly by name. As a result oerations like random access are cheap and efficient

In [18]:
%time df.loc['Alice'].compute()
CPU times: user 368 ms, sys: 76 ms, total: 444 ms
Wall time: 4.77 s
Out[18]:
id x y
name
Alice 1013 0.282312 0.986864
Alice 1003 0.907836 -0.433272
Alice 1028 0.480315 0.617785
Alice 990 0.286119 -0.221745
Alice 1007 -0.363408 0.219491
Alice 988 -0.352992 0.838554
Alice 988 0.113454 0.324375
Alice 975 -0.281090 -0.328833
Alice 1012 0.333475 0.100036
Alice 1019 0.953068 -0.230996
Alice 1022 -0.229455 -0.689157
Alice 985 0.750293 -0.261370
Alice 1003 -0.542618 -0.478460
Alice 1004 0.523155 -0.477897
Alice 1021 0.522600 -0.277876
Alice 1019 0.855172 -0.592085
Alice 958 -0.976187 0.582663
Alice 1068 0.311019 0.636247
Alice 1033 -0.773835 0.753354
Alice 966 0.099537 -0.096493
Alice 1029 -0.240761 0.125186
Alice 991 -0.530020 0.440593
Alice 1009 -0.503879 -0.603370
Alice 1017 -0.211632 -0.826981
Alice 1017 -0.682942 0.707673
Alice 948 -0.618033 -0.105620
Alice 1045 0.406609 0.925760
Alice 1023 0.879689 0.274448
Alice 990 0.937508 -0.945867
Alice 906 -0.918414 0.682363
... ... ... ...
Alice 1020 -0.741125 -0.163285
Alice 970 0.433164 -0.128036
Alice 1007 0.204996 -0.222945
Alice 990 0.715420 0.155715
Alice 980 0.401348 -0.798619
Alice 981 0.948217 0.262835
Alice 975 0.118068 0.915600
Alice 1036 -0.620055 -0.368023
Alice 1019 -0.431632 0.423678
Alice 993 -0.651463 0.860151
Alice 1008 -0.386001 0.496790
Alice 1006 -0.590724 0.682192
Alice 1034 -0.949103 -0.480490
Alice 1032 -0.465896 -0.510318
Alice 991 -0.405962 -0.844553
Alice 1023 -0.232913 0.074726
Alice 1007 -0.763430 -0.670528
Alice 1034 0.864738 0.853833
Alice 1015 0.232507 0.576739
Alice 1042 0.991864 -0.802995
Alice 1021 0.485880 0.412757
Alice 966 0.025623 0.212692
Alice 1058 0.484255 -0.915485
Alice 920 0.911058 -0.548753
Alice 1002 -0.020482 -0.120900
Alice 950 -0.199273 0.777972
Alice 971 0.576982 0.840741
Alice 948 -0.669867 -0.729792
Alice 965 -0.239483 -0.674630
Alice 1029 -0.768292 0.390507

111409 rows × 3 columns

Groupby Apply with Scikit-Learn

Now that our data is sorted by name we can easily do operations like random access on name, or groupby-apply with custom functions.

Here we train a different Scikit-Learn linear regression model on each name.

In [19]:
from  sklearn.linear_model import LinearRegression

def train(partition):
    est = LinearRegression()
    est.fit(partition[['x']].values, partition.y.values)
    return est
In [20]:
df.groupby('name').apply(train, meta=object).compute()
Out[20]:
name
Alice       LinearRegression(copy_X=True, fit_intercept=Tr...
Bob         LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie     LinearRegression(copy_X=True, fit_intercept=Tr...
Dan         LinearRegression(copy_X=True, fit_intercept=Tr...
Edith       LinearRegression(copy_X=True, fit_intercept=Tr...
Frank       LinearRegression(copy_X=True, fit_intercept=Tr...
George      LinearRegression(copy_X=True, fit_intercept=Tr...
Hannah      LinearRegression(copy_X=True, fit_intercept=Tr...
Ingrid      LinearRegression(copy_X=True, fit_intercept=Tr...
Jerry       LinearRegression(copy_X=True, fit_intercept=Tr...
Kevin       LinearRegression(copy_X=True, fit_intercept=Tr...
Laura       LinearRegression(copy_X=True, fit_intercept=Tr...
Michael     LinearRegression(copy_X=True, fit_intercept=Tr...
Norbert     LinearRegression(copy_X=True, fit_intercept=Tr...
Oliver      LinearRegression(copy_X=True, fit_intercept=Tr...
Patricia    LinearRegression(copy_X=True, fit_intercept=Tr...
Quinn       LinearRegression(copy_X=True, fit_intercept=Tr...
Ray         LinearRegression(copy_X=True, fit_intercept=Tr...
Sarah       LinearRegression(copy_X=True, fit_intercept=Tr...
Tim         LinearRegression(copy_X=True, fit_intercept=Tr...
Ursula      LinearRegression(copy_X=True, fit_intercept=Tr...
Victor      LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy       LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier      LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne      LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda       LinearRegression(copy_X=True, fit_intercept=Tr...
dtype: object

Right click to download this notebook from GitHub.