This lesson is being piloted (Beta version)

Developing parallel workflows

Overview

Teaching: 15 min
Exercises: 10 min
Questions
  • How to scale up and run thousands of jobs?

  • What is a DAG?

  • What is a Scatter-Gather paradigm?

  • How to run Yadage workflows on REANA?

Objectives
  • Learn about Directed Acyclic Graphs (DAG)

  • Understand Yadage workflow language

  • Practice running and inspecting parallel workflows

Overview

We now know how to develop reproducible analyses on small scale using serial workflows.

In this lesson we shall learn how to scale-up for real-life work which usually requires using parallel workflows.

Computational workflows as Directed Acyclic Graphs (DAG)

The computational workflows can be expressed as a set of computational steps where some steps depends on other steps before they can begin their computations. In other words, the computational steps constitute a Directed Acyclic Graph (DAG) where each graph vertex represents a unit of computation with its inputs and outputs, and the graph edges describe the interconnection of various computational steps. For example:

The graph is “directed” and “acyclic” because it can be topographically ordered so that later steps depends on earlier steps without cyclic dependencies, the progress flowing steadily from former steps to latter steps during analysis.

The REANA platform supports several DAG workflow specification languages:

Yadage

In this lesson we shall mostly use the Yadage workflow specification language (used e.g. in ATLAS). Yadage enables to describe even very complex computational workflows.

Let us start by having a look at the Yadage specification for the RooFit example we have used in the previous episodes:

stages:
  - name: gendata
    dependencies: [init]
    scheduler:
      scheduler_type: 'singlestep-stage'
      parameters:
        events: {step: init, output: events}
        gendata: {step: init, output: gendata}
        outfilename: '{workdir}/data.root'
      step:
        process:
          process_type: 'interpolated-script-cmd'
          script: root -b -q '{gendata}({events},"{outfilename}")'
        publisher:
          publisher_type: 'frompar-pub'
          outputmap:
            data: outfilename
        environment:
          environment_type: 'docker-encapsulated'
          image: 'docker.io/reanahub/reana-env-root6'
          imagetag: '6.18.04'
  - name: fitdata
    dependencies: [gendata]
    scheduler:
      scheduler_type: 'singlestep-stage'
      parameters:
        fitdata: {step: init, output: fitdata}
        data: {step: gendata, output: data}
        outfile: '{workdir}/plot.png'
      step:
        process:
          process_type: 'interpolated-script-cmd'
          script: root -b -q '{fitdata}("{data}","{outfile}")'
        publisher:
          publisher_type: 'frompar-pub'
          outputmap:
            plot: outfile
        environment:
          environment_type: 'docker-encapsulated'
          image: 'docker.io/reanahub/reana-env-root6'
          imagetag: '6.18.04'

We can see that the workflow consists of two stages, the gendata stage that does not depend on anything (this is denoted by [init] dependency which means the stage can run right after the workflow initialisation already), and the fitdata stage that depends on the completion of the gendata stage (this is denoted by [gendata]).

Note that each stage consists of a single workflow step (singlestep-stage) which represents the basic unit of computation of the workflow. (We shall see below an example of a multi-step stages which express the same basic unit of computations scattered over inputs.)

The step consists of the description of the process to run, the containerised environment in which to run the process, as well as the mapping of its outputs to the stage.

This is how the Yadage workflow engine understands which stages can be run in which order, what commands to run in each stage, and how to pass inputs and outputs between steps.

Snakemake

Let us open a brief parenthesis about other workflow languages such as Snakemake (used e.g. in LHCb). The same computational graph concepts apply here as well. What differs is the syntax how to express the dependencies between steps and the processes to run in each step.

For example, Snakemake uses “rules” where each rule defines its inputs and outputs and the command to run to produce them. The Snakemake workflow engine then computes the dependencies between rules based on how the outputs from some rules are used as inputs to other rules.

rule all:
    input:
        "results/data.root",
        "results/plot.png"

rule gendata:
    output:
        "results/data.root"
    params:
        events=20000
    container:
        "docker://docker.io/reanahub/reana-env-root6:6.18.04"
    shell:
        "mkdir -p results && root -b -q 'code/gendata.C({params.events},\"{output}\")'"

rule fitdata:
    input:
        data="results/data.root"
    output:
        "results/plot.png"
    container:
        "docker://docker.io/reanahub/reana-env-root6:6.18.04"
    shell:
        "root -b -q 'code/fitdata.C(\"{input.data}\",\"{output}\")'"

We see that the final plot is produced by the “fitdata” rule, which needs “data.root” file to be present, and it is the “gendata” rules that produces it. Hence Snakemake knows that it has to run the “gendata” rule first, and the computation of “fitdata” is deferred until “gendata” successfully completes. This process is very similar to how Makefile are being used in Unix software packages.

After this parenthesis note about Snakemake, let us now return to our Yadage example.

Running Yadage workflows

Let us try to write and run the above Yadage workflow on REANA.

We have to instruct REANA that we are going to use Yadage as our workflow engine. We can do that by editing reana.yaml and specifying:

inputs:
  files:
    - code/gendata.C
    - code/fitdata.C
    - workflow.yaml
  parameters:
    events: 20000
    gendata: code/gendata.C
    fitdata: code/fitdata.C
workflow:
  type: yadage
  file: workflow.yaml
outputs:
  files:
    - fitdata/plot.png

Here, workflow.yaml is a new file with the same content as specified above.

We now can run this example on REANA in the usual way:

reana-client run -w roofityadage

Exercise

Run RooFit example using Yadage workflow engine on the REANA cloud. Upload code, run workflow, inspect status, check logs, download final plot.

Solution

Nothing changes in the usual user interaction with the REANA platform:

reana-client create -w roofityadage -f ./reana-yadage.yaml
reana-client upload ./code -w roofityadage
reana-client start -w roofityadage
reana-client status -w roofityadage
reana-client logs -w roofityadage
reana-client ls -w roofityadage
reana-client download plot.png -w roofityadage

Physics code vs orchestration code

Note that it wasn’t necessary to change anything in our research code: we simply modified the workflow definition from Serial to Yadage and we could run the RooFit code “as is” using another workflow engine. This is a simple demonstration of the separation of concerns between “physics code” and “orchestration code”.

Parallelism via step dependencies

We have seen how the sequential workflows were expressed in the Yadage syntax using stage dependencies. Note that if the stage dependency graph would have permitted, the workflow steps not depending on each other, or on the results of previous computations, would have been executed in parallel by the workflow engine out of the box. The physicist only has to supply the knoweldge about which steps depend on which other steps and the workflow engine takes care of efficiently starting and scheduling tasks as necessary.

HiggsToTauTau analysis: simple version

Let us demonstrate how to write a Yadage workflow for the HiggsToTauTau example analysis using simple step dependencies.

The workflow stages look like:

stages:
- name: skim
  dependencies: [init]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      input_dir: {step: init, output: input_dir}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/skim'}

- name: histogram
  dependencies: [skim]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      input_dir: {step: skim, output: skimmed_dir}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/histogram'}

- name: fit
  dependencies: [histogram]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      histogram_file: {step: histogram, output: histogram_file}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/fit'}

- name: plot
  dependencies: [histogram]
  scheduler:
    scheduler_type: singlestep-stage
    parameters:
      histogram_file: {step: histogram, output: histogram_file}
      output_dir: '{workdir}/output'
    step: {$ref: 'steps.yaml#/plot'}

where steps are expressed as:

skim:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash skim.sh {input_dir} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      skimmed_dir: '{output_dir}'

histogram:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash histograms_with_custom_output_location.sh {input_dir} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      histogram_file: '{output_dir}/histograms.root'

plot:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash plot.sh {histogram_file} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-eventselection-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      datamc_plots: '{output_dir}'

fit:
  process:
    process_type: 'interpolated-script-cmd'
    script: |
      mkdir {output_dir}
      bash fit.sh {histogram_file} {output_dir}
  environment:
    environment_type: 'docker-encapsulated'
    image: gitlab-registry.cern.ch/awesome-workshop/awesome-analysis-statistics-stage3
    imagetag: master
  publisher:
    publisher_type: interpolated-pub
    publish:
      fitting_plot: '{output_dir}/fit.png'

The workflow definition is similar to that of the Serial workflow created in the previous episode. As we can see, this already leads to certain parallelism, because the fitting stage and the plotting stage can actually run simultaneously once the histograms are produced. The graphical representation of the above workflow looks as follows:

Let us try to run it on REANA cloud.

Exercise

Write and run a HiggsToTauTau analysis example using the Yadage workflow version presented above. Take the workflow definition, the step definition, and write the corresponding reana.yaml. Afterwards run the example on REANA cloud.

Solution

mkdir awesome-analysis-yadage-simple
cd awesome-analysis-yadage-simple
vim workflow.yaml # take workflow definition contents above
vim steps.yaml    # take step definition contents above
vim reana.yaml    # the goal of the exercise is to create this content
cat reana.yaml
inputs:
 files:
   - steps.yaml
   - workflow.yaml
  parameters:
    input_dir: root://eospublic.cern.ch//eos/root-eos/HiggsTauTauReduced
workflow:
  type: yadage
  file: workflow.yaml

Parallelism via scatter-gather paradigm

We have seen how to achieve a certain parallelism of workflow steps via simple dependency graph expressing which workflow steps depend on which others.

We now introduce a more advanced concept how to instruct the workflow engine to start many parallel computations. The paradigm is called “scatter-gather” and is used to instruct the workflow engine to run a certain parametrised command over an array of input values in parallel (the “scatter” operation) whilst assembling these results together afterwards (the “gather” operation). The “scatter-gather” paradigm allows to scale computations in a “map-reduce” fashion over input values with a minimal syntax without having to duplicate workflow code or write loop statements.

Here is an example of scatter-gather paradim in the Yadage language. Note the use of “multi-step” stage definition, expressing that the given stage is actually running multiple parametrised steps:

stages:
  - name: filter1
    dependencies: [init]
    scheduler:
      scheduler_type: multistep-stage
      parameters:
        input: {stages: init, output: input, unwrap: true}
      batchsize: 2
      scatter:
        method: zip
        parameters: [input]
      step: {$ref: steps.yaml#/filter}
  - name: filter2
    dependencies: [filter1]
    scheduler:
      scheduler_type: multistep-stage
      parameters:
        input: {stages: filter1, output: output, unwrap:true}
      batchsize: 2
      scatter:
        method: zip
        parameters: [input]
      step: {$ref: steps.yaml#/filter}
  - name: filter3
    dependencies: [filter2]
    scheduler:
      scheduler_type: singlestep-stage
      parameters:
        input: {stages: 'filter2', output: output}
      step: {$ref: steps.yaml#/filter}

The graphical representation of the computational graph looks like:

Note how the “scatter” operation is automatically happening over the given “input” array with the wanted batch size, processing files two by two irrespective of the number of input files. Note also the automatic “cascading” of computations.

In the next episode we shall see how the scatter-gather paradigm can be used to speed up the HiggsToTauTau sequential workflow that we developed in the previous episode.

Key Points

  • Computational analysis is a graph of inter-dependent steps

  • Fully declare inputs and outputs for each step

  • Use dependencies between workflow steps to allow running jobs in parallel

  • Use scatter/gather paradigm to parallelise parametrised computations