Introduction to dask arrays

Dask arrays extend the numpy interface to larger than memory and parallel workflows across a distributed cluster. They look and feel a lot like numpy and use numpy under the hood. Indeed Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.

daskarray

You can create a dask array like following. We create an array filled with 1 of lenght 15. We have to specify a chunk size.

import dask.array as da
x = da.ones(15, chunks=(5,))
x
Array Chunk
Bytes 120 B 40 B
Shape (15,) (5,)
Count 3 Tasks 3 Chunks
Type float64 numpy.ndarray
15 1

The output is a dask array composed of 3 numpy arrays of size 5 each. If we try to compute the sum of all the elements of the array we won't get the result by using the sum method. Indeed, Dask objects are lazy by default and run the computations only when instructed.

x.sum()
Array Chunk
Bytes 8 B 8 B
Shape () ()
Count 7 Tasks 1 Chunks
Type float64 numpy.ndarray

We have to call compute at the end of each operation if we want the result.

x.sum().compute()
15.0

The above example is pretty trival. Let's say now that we have a 10000 by 10000 array. We choose to represente that array by chunks of 1000 by 1000.

x = da.random.random((10000,10000), chunks=(1000,1000))
x
Array Chunk
Bytes 800.00 MB 8.00 MB
Shape (10000, 10000) (1000, 1000)
Count 100 Tasks 100 Chunks
Type float64 numpy.ndarray
10000 10000

Dask has created a 10 by 10 grid where each element of that list is a 1000 by 1000 numpy array.

Again, we can do operations on that dataset that are very similar to the way we would do it with numpy.

First let's add the array to its transpose

y = x + x.T

Then slice the array and take the mean.

z = y[::2, :].mean(axis=1)
z
Array Chunk
Bytes 40.00 kB 4.00 kB
Shape (5000,) (500,)
Count 540 Tasks 10 Chunks
Type float64 numpy.ndarray
5000 1

When we want to get access to the result, we just need to use the compute method and dask will compute the result in parallel on the different cores of the machine.

z.compute()
array([0.99874649, 0.99722168, 0.99725464, ..., 1.00849801, 1.00448204,
       0.99683664])

In practice, dask is often used in tandem with data file formats like HDF5, zar or netcdf.

In this situation you might load a file from disk and use the from_array function.

import h5py
f = h5py.File("myfile.hdf5")
d = f["/data/path"]
d.shape
(10000000,1000000)

import dask.array as da
x = da.from_array(d, chunks=(10000, 10000))
x.mean(axis=0).compute()