Skip to content

Working with Dask: Scalable Data Science & Machine Learning in the Cloud 📊 ​

PyRun's integration with Dask brings the power of scalable data science and machine learning to your fingertips. Dask allows you to parallelize your Python code across clusters of machines, enabling you to work with datasets and computations that exceed the limits of a single computer. This guide will get you started with Dask in PyRun.

Example Dask Code: Parallel Hello World ​

When you create a Dask workspace in PyRun, you'll find an example.py file with a sample Dask script. Here's the code:

python
import time
import dask
from dask.distributed import Client, LocalCluster
from dask_cloudprovider.aws import FargateCluster, EC2Cluster


def say_hello(name):
    """Function to say hello to a name with a delay"""
    time.sleep(1)  # Simulate some work
    return f"Hello, {name} from Dask on AWS EC2!"


def main():
    # Start a local Dask cluster
    cluster = LocalCluster()

    # Start an EC2 Dask cluster
    """cluster = EC2Cluster(
        n_workers=2,
        security=False,  # Avoid encountering this error: https://github.com/dask/dask-cloudprovider/issues/249
        region="us-east-1",
    )"""

    # Start a Fragate Dask cluster
    """cluster = FargateCluster(
        n_workers=2,
        region_name="us-east-1",
    )"""

    client = Client(cluster)

    print(f"Dask dashboard available at: {client.dashboard_link}")

    # Create a list of names
    names = ["World", "AWS", "EC2", "Dask", "Python"]

    # Create delayed tasks
    results = []
    for name in names:
        # Schedule the task to run on the cluster
        future = client.submit(say_hello, name)
        results.append(future)

    # Gather and print results
    print("\nResults:")
    for future in results:
        print(f"  {future.result()}")

    # Display cluster information
    print("\nCluster Information:")
    print(f"  Workers: {len(client.scheduler_info()['workers'])}")
    print(f"  Cores: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")

    # Clean up
    try:
        client.close()
        cluster.close()
    except Exception:
        pass


if __name__ == "__main__":
    main()

Understanding the Code:

  • Imports: The code imports necessary libraries from dask and dask.distributed for distributed computing, and dask_cloudprovider.aws for cloud-based clusters on AWS.
  • say_hello(name) function: This is a simple function that simulates work by pausing for 1 second and then returns a greeting message. This function will be parallelized using Dask.
  • main() function:
    • Cluster Setup (commented out examples): The code provides examples of how to create different types of Dask clusters:
      • LocalCluster(): Starts a local Dask cluster on the same machine where the script is run (for testing or smaller workloads). This is the active cluster type in the example.
      • EC2Cluster(...): Example of how to create a Dask cluster on AWS EC2 instances. This is commented out but shows how to configure an EC2 cluster.
      • FargateCluster(...): Example of how to create a Dask cluster on AWS Fargate (serverless containers). Also commented out.
    • Client(cluster): Creates a Dask Client object, which is used to interact with the Dask cluster.
    • client.dashboard_link: Prints the link to the Dask dashboard, which provides real-time monitoring of your Dask cluster and tasks.
    • Task Submission (client.submit): The code submits the say_hello function multiple times with different names using client.submit. client.submit schedules these tasks to be executed in parallel on the Dask cluster.
    • Result Gathering (future.result): The code retrieves and prints the results of each submitted task using future.result(). Dask handles the distribution of tasks and collection of results automatically.
    • Cluster Information: Prints information about the Dask cluster, like the number of workers and cores.
    • Cleanup: Closes the Dask client and cluster.

Running the Example:

  1. Create a Dask workspace in PyRun (if you haven't already, see Getting Started).
  2. Open example.py in your workspace.
  3. Click the "Run" button.

PyRun will execute this script. Because the LocalCluster is active in the example code, it will run a local Dask cluster within your PyRun workspace environment. You can access the Dask dashboard link printed in the output to monitor the execution.

Configuring Your Dask Cluster ​

The example code demonstrates how to configure different types of Dask clusters directly within your Python script.

To choose a different cluster type (e.g., EC2Cluster, FargateCluster) or customize cluster parameters:

  1. Edit example.py: Open the example.py file in your workspace.
  2. Uncomment and Modify Cluster Configuration: Uncomment the lines for the EC2Cluster or FargateCluster that you want to use.
  3. Adjust Cluster Parameters: Modify the parameters within the EC2Cluster(...) or FargateCluster(...) calls to configure your cluster:
    • n_workers: Number of Dask workers to start.
    • region: AWS region for cloud clusters (e.g., "us-east-1").
    • security=False (for EC2Cluster example): Included to avoid a specific error, but you may want to configure security groups for production deployments.
    • Refer to the Dask Cloud Provider documentation and Dask documentation for a full list of configurable parameters for each cluster type.
  4. Comment out LocalCluster(): Comment out the line cluster = LocalCluster() if you are using a cloud-based cluster type like EC2Cluster or FargateCluster.
  5. Save example.py.
  6. Run your code again. PyRun will now use your modified Dask cluster configuration.

Example: Running on AWS EC2

To run the example on an AWS EC2 cluster, you would modify the main() function in example.py to look like this:

python
def main():
    """
    cluster = LocalCluster() # Comment out LocalCluster
    """
    # Uncomment and configure EC2Cluster
    cluster = EC2Cluster(
        n_workers=2,
        security=False,
        region="us-east-1",
    )
    """
    # cluster = FargateCluster(...) # Keep FargateCluster commented out for now

    client = Client(cluster)
    # ... rest of the main() function ...

Remember to configure the region and other parameters appropriately for your AWS account and desired setup.

Installing Packages for Dask ​

If your Dask workflows require additional Python packages, customize your runtime environment by editing the environment.yml file as described in Customizing Your Runtime.

Unleash Scalable Data Science with PyRun and Dask ​

PyRun simplifies using Dask to scale your data science and machine learning workloads in the cloud. Experiment with different cluster configurations, scale your computations, and leverage the power of distributed computing with PyRun and Dask!

Next Steps:

  • Explore more advanced Dask examples and use cases for data analysis and machine learning.
  • Experiment with different Dask cluster types (EC2Cluster, FargateCluster) and configurations.
  • Learn how to integrate Dask with other PyRun features like data storage and monitoring.

Start scaling your Python workflows today!