Introduction to dask DataFrames
Dask arrays extend the pandas interface to work on larger than memory datasets on a single machine or distributed datasets on a cluster of machines. It reuses a lot of the pandas code but extends it to larger scales.
Start with pandas
To see how that works, we start with pandas in order to show a bit later how similar the interfaces look.
import pandas as pd
df = pd.read_csv("data/1.csv")
df.head()
index | X | Y | Z | value | |
---|---|---|---|---|---|
0 | 10000 | 0.613648 | 0.514523 | 0.675306 | 0.997480 |
1 | 10001 | 0.785925 | 0.418075 | 0.558356 | 0.435089 |
2 | 10002 | 0.382117 | 0.841691 | 0.263298 | 0.120973 |
3 | 10003 | 0.374417 | 0.534436 | 0.093729 | 0.104052 |
4 | 10004 | 0.061580 | 0.404272 | 0.826618 | 0.980229 |
Once the data is loaded, we can work on it pretty easily. For instance we can take the mean of the value column and get the result instantly.
df.value.mean()
0.5004419057234618
When we want to operate on many files or if the size of the dataset is larger than memory pandas breaks down.
Read all CSV files lazily with Dask DataFrames
Intead of using pandas we will use the dask DataFrame to load the csv.
import dask.dataframe as dd
df = dd.read_csv("data/1.csv")
df
index | X | Y | Z | value | |
---|---|---|---|---|---|
npartitions=1 | |||||
int64 | float64 | float64 | float64 | float64 | |
... | ... | ... | ... | ... |
As you can see, the dask DataFrame didn't return any data. If we want some data we can use the head
function.
df.head()
index | X | Y | Z | value | |
---|---|---|---|---|---|
0 | 10000 | 0.613648 | 0.514523 | 0.675306 | 0.997480 |
1 | 10001 | 0.785925 | 0.418075 | 0.558356 | 0.435089 |
2 | 10002 | 0.382117 | 0.841691 | 0.263298 | 0.120973 |
3 | 10003 | 0.374417 | 0.534436 | 0.093729 | 0.104052 |
4 | 10004 | 0.061580 | 0.404272 | 0.826618 | 0.980229 |
Like previously, we can compute the mean of the value column
df.value.mean()
dd.Scalar<series-..., dtype=float64>
Notice that we didn't get a full result. Indeed the dask DataFrame like every Dask objects is lazy by default. You have to use the compute function to get the result.
df.value.mean().compute()
0.5004419057234627
Another advantage of Dask DataFrames is that we can work on multiple files instead of a file at once.
df = dd.read_csv("data/*.csv")
df
index | X | Y | Z | value | |
---|---|---|---|---|---|
npartitions=64 | |||||
int64 | float64 | float64 | float64 | float64 | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
df.value.mean().compute()
0.5005328752185645
Index, partitions and sorting
Every Dask DataFrames is composed of many Pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These Pandas objects may live on disk or on other machines.
All those partitions are loaded in parallel.
df
index | X | Y | Z | value | |
---|---|---|---|---|---|
npartitions=64 | |||||
int64 | float64 | float64 | float64 | float64 | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
When we look at the structure of the Dask Dataframe, we see that is is composed of 192 python functions that must be run in order to run the dask dataframe.
Each of this partitions is a pandas DataFrame
type(df.partitions[3].compute())
pandas.core.frame.DataFrame
We can write a function mapped to all the partitions of the dask dataframe in order to see that we have 64 partitions. Each of them is a pandas DataFrame.
df.map_partitions(type).compute()
0 <class 'pandas.core.frame.DataFrame'>
1 <class 'pandas.core.frame.DataFrame'>
2 <class 'pandas.core.frame.DataFrame'>
3 <class 'pandas.core.frame.DataFrame'>
4 <class 'pandas.core.frame.DataFrame'>
...
59 <class 'pandas.core.frame.DataFrame'>
60 <class 'pandas.core.frame.DataFrame'>
61 <class 'pandas.core.frame.DataFrame'>
62 <class 'pandas.core.frame.DataFrame'>
63 <class 'pandas.core.frame.DataFrame'>
Length: 64, dtype: object
In the df dataframe, we notice that there is a column of unique values called index. We will use this as the index of the dataframe.
df = df.set_index("index")
df
X | Y | Z | value | |
---|---|---|---|---|
npartitions=64 | ||||
0 | float64 | float64 | float64 | float64 |
9624 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
629999 | ... | ... | ... | ... |
639999 | ... | ... | ... | ... |
This operation requires to load all the data in order to find the minimal and maximal values of this column. Thanks to this operation, if we want to get some data contained between two indices dask will know in which file to find the data and won't have to reload all the files.
Write the data to Parquet
Parquet is a columnar file format and is tightly integrated with both dask and pandas. You can you the to_parquet
function to export the dataframe to a parquet file.
df.to_parquet("data/data.parquet")