Introduction to dask DataFrames

Dask arrays extend the pandas interface to work on larger than memory datasets on a single machine or distributed datasets on a cluster of machines. It reuses a lot of the pandas code but extends it to larger scales.

Start with pandas

To see how that works, we start with pandas in order to show a bit later how similar the interfaces look.

import pandas as pd

df = pd.read_csv("data/1.csv")
df.head()
index X Y Z value
0 10000 0.613648 0.514523 0.675306 0.997480
1 10001 0.785925 0.418075 0.558356 0.435089
2 10002 0.382117 0.841691 0.263298 0.120973
3 10003 0.374417 0.534436 0.093729 0.104052
4 10004 0.061580 0.404272 0.826618 0.980229

Once the data is loaded, we can work on it pretty easily. For instance we can take the mean of the value column and get the result instantly.

df.value.mean()
0.5004419057234618

When we want to operate on many files or if the size of the dataset is larger than memory pandas breaks down.

Read all CSV files lazily with Dask DataFrames

Intead of using pandas we will use the dask DataFrame to load the csv.

import dask.dataframe as dd

df = dd.read_csv("data/1.csv")
df
Dask DataFrame Structure:
index X Y Z value
npartitions=1
int64 float64 float64 float64 float64
... ... ... ... ...
Dask Name: from-delayed, 3 tasks

As you can see, the dask DataFrame didn't return any data. If we want some data we can use the head function.

df.head()
index X Y Z value
0 10000 0.613648 0.514523 0.675306 0.997480
1 10001 0.785925 0.418075 0.558356 0.435089
2 10002 0.382117 0.841691 0.263298 0.120973
3 10003 0.374417 0.534436 0.093729 0.104052
4 10004 0.061580 0.404272 0.826618 0.980229

Like previously, we can compute the mean of the value column

df.value.mean()
dd.Scalar<series-..., dtype=float64>

Notice that we didn't get a full result. Indeed the dask DataFrame like every Dask objects is lazy by default. You have to use the compute function to get the result.

df.value.mean().compute()
0.5004419057234627

Another advantage of Dask DataFrames is that we can work on multiple files instead of a file at once.

df = dd.read_csv("data/*.csv")
df
Dask DataFrame Structure:
index X Y Z value
npartitions=64
int64 float64 float64 float64 float64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: from-delayed, 192 tasks
df.value.mean().compute()
0.5005328752185645

Index, partitions and sorting

Every Dask DataFrames is composed of many Pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These Pandas objects may live on disk or on other machines.

All those partitions are loaded in parallel.

daskdataframe

df
Dask DataFrame Structure:
index X Y Z value
npartitions=64
int64 float64 float64 float64 float64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: from-delayed, 192 tasks

When we look at the structure of the Dask Dataframe, we see that is is composed of 192 python functions that must be run in order to run the dask dataframe.

Each of this partitions is a pandas DataFrame

type(df.partitions[3].compute())
pandas.core.frame.DataFrame

We can write a function mapped to all the partitions of the dask dataframe in order to see that we have 64 partitions. Each of them is a pandas DataFrame.

df.map_partitions(type).compute()
0     <class 'pandas.core.frame.DataFrame'>
1     <class 'pandas.core.frame.DataFrame'>
2     <class 'pandas.core.frame.DataFrame'>
3     <class 'pandas.core.frame.DataFrame'>
4     <class 'pandas.core.frame.DataFrame'>
                      ...                  
59    <class 'pandas.core.frame.DataFrame'>
60    <class 'pandas.core.frame.DataFrame'>
61    <class 'pandas.core.frame.DataFrame'>
62    <class 'pandas.core.frame.DataFrame'>
63    <class 'pandas.core.frame.DataFrame'>
Length: 64, dtype: object

In the df dataframe, we notice that there is a column of unique values called index. We will use this as the index of the dataframe.

df = df.set_index("index")
df
Dask DataFrame Structure:
X Y Z value
npartitions=64
0 float64 float64 float64 float64
9624 ... ... ... ...
... ... ... ... ...
629999 ... ... ... ...
639999 ... ... ... ...
Dask Name: sort_index, 642 tasks

This operation requires to load all the data in order to find the minimal and maximal values of this column. Thanks to this operation, if we want to get some data contained between two indices dask will know in which file to find the data and won't have to reload all the files.

Write the data to Parquet

Parquet is a columnar file format and is tightly integrated with both dask and pandas. You can you the to_parquet function to export the dataframe to a parquet file.

df.to_parquet("data/data.parquet")