2.10. 工作流#

工作流 (workflow) 指的是将命令行工具、表达式工具或(子)工作流等作为步骤进行执行的 CWL 处理单元。一个工作流必须具备 CWL 程序所定义的 inputs(输入), outputs(输出), 以及 step(步骤).

digraph G { compound=true; rankdir="LR"; fontname="Verdana"; fontsize="10"; graph [splines=ortho]; node [fontname="Verdana", fontsize="10", shape=box]; edge [fontname="Verdana", fontsize="10"]; subgraph cluster_0 { node [width = 1.75]; steps_0[style="filled" label="Command-line tools"]; steps_1[style="filled" label="Expression tools"]; steps_2[style="filled" label="Sub-workflows"]; label="steps"; fill=gray; } inputs -> steps_1 [lhead=cluster_0]; steps_1 -> outputs [ltail=cluster_0]; }

CWL 工作流#

CWL 文件 echo-uppercase.cwl 定义了之前的例子中运行命令行工具的工作流以及表达式工具。

echo-uppercase.cwl#
cwlVersion: v1.2
class: Workflow

requirements:
  InlineJavascriptRequirement: {}

inputs:
  message: string

outputs:
  out:
    type: string
    outputSource: uppercase/uppercase_message

steps:
  echo:
    run: echo.cwl
    in:
      message: message
    out: [out]
  uppercase:
    run: uppercase.cwl
    in:
      message:
        source: echo/out
    out: [uppercase_message]

命令行工具或表达式工具亦可直接写在定义工作流的单一 CWL 文件里。例如,我们可以将 echo-uppercase.cwl 改写成一个单独的文件如下:

echo-uppercase-single-file.cwl#
cwlVersion: v1.2
class: Workflow

requirements:
  InlineJavascriptRequirement: {}

inputs:
  message: string

outputs:
  out:
    type: string
    outputSource: uppercase/uppercase_message

steps:
  echo:
    run:
      class: CommandLineTool

      baseCommand: echo

      stdout: output.txt

      inputs:
        message:
          type: string
          inputBinding: {}
      outputs:
        out:
          type: string
          outputBinding:
            glob: output.txt
            loadContents: true
            outputEval: $(self[0].contents)
    in:
      message: message
    out: [out]
  uppercase:
    run:
      class: ExpressionTool

      requirements:
        InlineJavascriptRequirement: {}

      inputs:
        message: string
      outputs:
        uppercase_message: string

      expression: |
        ${ return {"uppercase_message": inputs.message.toUpperCase()}; }
    in:
      message:
        source: echo/out
    out: [uppercase_message]

将代码分为多个独立的文件有助于模块化和代码的组织条理。不过,把全部代码集中在一个文件,可能对开发更有利。将多个文件整合为一个,还有其他手段(如 cwltool --pack),将在本《指南》的其他章节进一步讨论。

备注

子工作流 (sub-workflow) 需要启用 SubworkflowFeatureRequirement 这一要求,这在另一章节中将有更详细的教程。

2.10.1. 编写工作流#

这个工作流从 tar 文件中提取一个 Java 源文件,然后编译。

1st-workflow.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: Workflow
inputs:
  tarball: File
  name_of_file_to_extract: string

outputs:
  compiled_class:
    type: File
    outputSource: compile/classfile

steps:
  untar:
    run: tar-param.cwl
    in:
      tarfile: tarball
      extractfile: name_of_file_to_extract
    out: [extracted_file]

  compile:
    run: arguments.cwl
    in:
      src: untar/extracted_file
    out: [classfile]

1st-workflow.cwl 的图示

1st-workflow.cwl 的图示

在分立的文件中通过 YAML 或 JSON 对象来描述一次运行的输入:

1st-workflow-job.yml#
tarball:
  class: File
  path: hello.tar
name_of_file_to_extract: Hello.java

接下来,创建一个 Java 文件样本,将其打包到一个 tar 文件,以供运行命令行工具时使用。

$ echo "public class Hello {}" > Hello.java && tar -cvf hello.tar Hello.java
Hello.java

现在,在命令行上以工具描述和输入对象为参数调用 cwltool:

$ cwltool 1st-workflow.cwl 1st-workflow-job.yml
INFO /home/docs/checkouts/readthedocs.org/user_builds/common-workflow-languageuser-guide-zh-hans/envs/latest/bin/cwltool 3.1.20240508115724
INFO Resolved '1st-workflow.cwl' to 'file:///home/docs/checkouts/readthedocs.org/user_builds/common-workflow-languageuser-guide-zh-hans/checkouts/latest/src/_includes/cwl/workflows/1st-workflow.cwl'
INFO [workflow ] start
INFO [workflow ] starting step untar
INFO [step untar] start
INFO [job untar] /tmp/1t_07v_q$ tar \
    --extract \
    --file \
    /tmp/hc4wyqs3/stg898c6010-d1d1-480b-8851-bcd07bdfbcf5/hello.tar \
    Hello.java
INFO [job untar] completed success
INFO [step untar] completed success
INFO [workflow ] starting step compile
INFO [step compile] start
ERROR Workflow error, try again with --debug for more information:
Docker is not available for this tool, try --no-container to disable Docker, or install a user space Docker replacement like uDocker with --user-space-docker-cmd.: docker executable is not available

这是怎么一回事呢?我们一一道来:

cwlVersion: v1.0
class: Workflow

cwlVersion 字段指明该文件使用的 CWL 规约版本。class 字段表明该文件描述的是一个工作流。

inputs:
  tarball: File
  name_of_file_to_extract: string

inputs 代码段描述的是工作流的输入,即一组输入参数。这里面的每个参数都由标识符和数据类型构成,它们可以用作工作流中某个指定步骤的输入源。

outputs:
  compiled_class:
    type: File
    outputSource: compile/classfile

outputs 代码段描述的是工作流的输出。这同样是一组由标识符和数据类型构成的参数。outputSourcecompile(编译)步骤的输出参数 classfile 同工作流的输出参数 compiled_class 建立联系。

steps:
  untar:
    run: tar-param.cwl
    in:
      tarfile: tarball
      extractfile: name_of_file_to_extract
    out: [extracted_file]

steps 代码段描述的是工作流的实际步骤。这个例子中,第一个步骤是从 tar 归档文件中提取一个文件,而第二步是使用 Java 编译器编译来自第一步的文件。工作流的各个步骤不是必须按照在代码中列出的顺序运行,而是由各个步骤之间(由 source 决定)的依赖关系确定其先后次序。此外,工作流中没有依赖关系的多个步骤允许并行运行。

第一个步骤 untar 运行 tar-param.cwl(此前见于《参数引用》一节)。该工具有两个输入参数 tarfileextractfile, 以及一个输出参数 extracted_file.

该工作流步骤下的 in 这段代码将上述两个输入参数同工作流的输入 tarballname_of_file_to_extract 通过source 关联起来。这意味着当这个工作流步骤执行时,为了运行其指定的工具,tarballname_of_file_to_extract 所赋予的值将用于 tarfileextractfile 参数。

工作流步骤下的 out 代码段列出了预期中从工具应获得的参数。

  compile:
    run: arguments.cwl
    in:
      src: untar/extracted_file
    out: [classfile]

第二个步骤 compile(编译)依赖于第一步的结果,具体而言它的输入参数 srcuntar 步骤的输出参数 untar/extracted_file 相关联。这一步骤运行 arguments.cwl(此前见于《附加参数》一节)。此步骤的输出 classfile 关联的是工作流的 outputs 部分(见上)。

2.10.2. 嵌套的工作流#

工作流的作用在于将多种工具组合起来,进行更大规模的操作。我们还可以将一个工作流整体视为一个工具;如果工作流引擎支持 SubworkflowFeatureRequirement, 则 CWL 工作流可以用作另一 CWL 工作流中的单个步骤:

requirements:
  SubworkflowFeatureRequirement: {}

下面这个例子里的工作流嵌入了我们的 1st-workflow.cwl 工作流:

nestedworkflows.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: Workflow

inputs: []

outputs:
  classout:
    type: File
    outputSource: compile/compiled_class

requirements:
  SubworkflowFeatureRequirement: {}

steps:
  compile:
    run: 1st-workflow.cwl
    in:
      tarball: create-tar/tar_compressed_java_file
      name_of_file_to_extract:
        default: "Hello.java"
    out: [compiled_class]

  create-tar:
    in: []
    out: [tar_compressed_java_file]
    run:
      class: CommandLineTool
      requirements:
        InitialWorkDirRequirement:
          listing:
            - entryname: Hello.java
              entry: |
                public class Hello {
                  public static void main(String[] argv) {
                      System.out.println("Hello from Java");
                  }
                }
      inputs: []
      baseCommand: [tar, --create, --file=hello.tar, Hello.java]
      outputs:
        tar_compressed_java_file:
          type: File
          streamable: true
          outputBinding:
            glob: "hello.tar"

备注

Visualization of the workflow and the inner workflow from its `compile` step

这个由两个步骤构成的工作流,起始于 create-tar 步骤,进而接入橙色的 compile 步骤;如右侧图所示,compile 是另一个工作流。由紫色的部分可见字符串常量 "Hello.java" 赋值给 name_of_file_to_extract.

Visualization of nestedworkflows.cwl Visualization of 1st-workflow.cwl

CWL Workflow(工作流)就和 CommandLineTool(命令行工具)一样,可以充当一个步骤。通过 run 字段可以将其 CWL 文件包含进来。然后,工作流的输入 (tarballname_of_file_to_extract) 与输出 (compiled_class) 就可以映射为这一步骤的输入/输出。

  compile:
    run: 1st-workflow.cwl
    in:
      tarball: create-tar/tar_compressed_java_file
      name_of_file_to_extract:
        default: "Hello.java"
    out: [compiled_class]

Our 1st-workflow.cwl was parameterized with workflow inputs, so when running it we had to provide a job file to denote the tar file and *.java filename. This is generally best-practice, as it means it can be reused in multiple parent workflows, or even in multiple steps within the same workflow.

Here we use default: to hard-code "Hello.java" as the name_of_file_to_extract input, however our workflow also requires a tar file at tarball, which we will prepare in the create-tar step. At this point it is probably a good idea to refactor 1st-workflow.cwl to have more specific input/output names, as those also appear in its usage as a tool.

It is also possible to do a less generic approach and avoid external dependencies in the job file. So in this workflow we can generate a hard-coded Hello.java file using the previously mentioned InitialWorkDirRequirement requirement, before adding it to a tar file.

  create-tar:
    requirements:
      InitialWorkDirRequirement:
        listing:
          - entryname: Hello.java
            entry: |
              public class Hello {
                public static void main(String[] argv) {
                    System.out.println("Hello from Java");
                }
              }

In this case our step can assume Hello.java rather than be parameterized, so we can use hardcoded values hello.tar and Hello.java in a baseCommand and the resulting outputs:

  run:
    class: CommandLineTool
    inputs: []
    baseCommand: [tar, --create, --file=hello.tar, Hello.java]
    outputs:
      tar_compressed_java_file:
        type: File
        streamable: true
        outputBinding:
          glob: "hello.tar"

Did you notice that we didn’t split out the tar --create tool to a separate file, but rather embedded it within the CWL Workflow file? This is generally not best practice, as the tool then can’t be reused. The reason for doing it in this case is because the command line is hard-coded with filenames that only make sense within this workflow.

In this example we had to prepare a tar file outside, but only because our inner workflow was designed to take that as an input. A better refactoring of the inner workflow would be to take a list of Java files to compile, which would simplify its usage as a tool step in other workflows.

Nested workflows can be a powerful feature to generate higher-level functional and reusable workflow units - but just like for creating a CWL Tool description, care must be taken to improve its usability in multiple workflows.

2.10.3. Scattering Steps#

Now that we know how to write workflows, we can start utilizing the ScatterFeatureRequirement. This feature tells the runner that you wish to run a tool or workflow multiple times over a list of inputs. The workflow then takes the input(s) as an array and will run the specified step(s) on each element of the array as if it were a single input. This allows you to run the same workflow on multiple inputs without having to generate many different commands or input yaml files.

requirements:
  ScatterFeatureRequirement: {}

The most common reason a new user might want to use scatter is to perform the same analysis on different samples. Let’s start with a simple workflow that calls our first example (hello_world.cwl) and takes an array of strings as input to the workflow:

scatter-workflow.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: Workflow

requirements:
  ScatterFeatureRequirement: {}

inputs:
  message_array: string[]

steps:
  echo:
    run: hello_world.cwl
    scatter: message
    in:
      message: message_array
    out: []

outputs: []

Aside from the requirements section including ScatterFeatureRequirement, what is going on here?

inputs:
  message_array: string[]

First of all, notice that the main workflow level input here requires an array of strings.

steps:
  echo:
    run: hello_world.cwl
    scatter: message
    in:
      message: message_array
    out: []

Here we’ve added a new field to the step echo called scatter. This field tells the runner that we’d like to scatter over this input for this particular step. Note that the input name listed after scatter is the one of the step’s input, not a workflow level input.

For our first scatter, it’s as simple as that! Since our tool doesn’t collect any outputs, we still use outputs: [] in our workflow, but if you expect that the final output of your workflow will now have multiple outputs to collect, be sure to update that to an array type as well!

Using the following input file:

scatter-job.yml#
message_array: 
  - Hello world!
  - Hola mundo!
  - Bonjour le monde!
  - Hallo welt!

As a reminder, hello_world.cwl simply calls the command echo on a message. If we invoke cwltool scatter-workflow.cwl scatter-job.yml on the command line:

$ cwltool scatter-workflow.cwl scatter-job.yml
INFO /home/docs/checkouts/readthedocs.org/user_builds/common-workflow-languageuser-guide-zh-hans/envs/latest/bin/cwltool 3.1.20240508115724
INFO Resolved 'scatter-workflow.cwl' to 'file:///home/docs/checkouts/readthedocs.org/user_builds/common-workflow-languageuser-guide-zh-hans/checkouts/latest/src/_includes/cwl/workflows/scatter-workflow.cwl'
INFO [workflow ] start
INFO [workflow ] starting step echo
INFO [step echo] start
INFO [job echo] /tmp/guw80p8i$ echo \
    'Hello world!' > /tmp/guw80p8i/f2ee667fba65d968d4f6092765fa965e792bb20b
INFO [job echo] completed success
INFO [step echo] start
INFO [job echo_2] /tmp/ib4l4911$ echo \
    'Hola mundo!' > /tmp/ib4l4911/f2ee667fba65d968d4f6092765fa965e792bb20b
INFO [job echo_2] completed success
INFO [step echo] start
INFO [job echo_3] /tmp/0kwi9mvv$ echo \
    'Bonjour le monde!' > /tmp/0kwi9mvv/f2ee667fba65d968d4f6092765fa965e792bb20b
INFO [job echo_3] completed success
INFO [step echo] start
INFO [job echo_4] /tmp/ys6pu1ua$ echo \
    'Hallo welt!' > /tmp/ys6pu1ua/f2ee667fba65d968d4f6092765fa965e792bb20b
INFO [job echo_4] completed success
INFO [step echo] completed success
INFO [workflow ] completed success
{}INFO Final process status is success

You can see that the workflow calls echo multiple times on each element of our message_array. Ok, so how about if we want to scatter over two steps in a workflow?

Let’s perform a simple echo like above, but capturing stdout by adding the following lines instead of outputs: []

hello_world_to_stdout.cwl#
outputs:
  echo_out:
    type: stdout

And add a second step that uses wc to count the characters in each file. See the tool below:

wc-tool.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: CommandLineTool
baseCommand: wc
arguments: ["-c"]
inputs:
  input_file:
    type: File
    inputBinding:
      position: 1
outputs: []

Now, how do we incorporate scatter? Remember the scatter field is under each step:

scatter-two-steps.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: Workflow

requirements:
 ScatterFeatureRequirement: {}

inputs:
  message_array: string[]

steps:
  echo:
    run: hello_world_to_stdout.cwl
    scatter: message
    in:
      message: message_array
    out: [echo_out]
  wc:
    run: wc-tool.cwl
    scatter: input_file
    in:
      input_file: echo/echo_out
    out: []

outputs: []

Here we have placed the scatter field under each step. This is fine for this example since it runs quickly, but if you’re running many samples for a more complex workflow, you may wish to consider an alternative. Here we are running scatter on each step independently, but since the second step is not dependent on the first step completing all languages, we aren’t using the scatter functionality efficiently. The second step expects an array as input from the first step, so it will wait until everything in step one is finished before doing anything. Pretend that echo Hello World! takes 1 minute to perform, wc -c on the output takes 3 minutes and that echo Hallo welt! takes 5 minutes to perform, and wc on that output takes 3 minutes. Even though echo Hello World! could finish in 4 minutes, it will actually finish in 8 minutes because the first step must wait on echo Hallo welt!. You can see how this might not scale well.

Ok, so how do we scatter on steps that can proceed independent of other samples? Remember from Nested Workflows, that we can make an entire workflow a single step in another workflow! Convert our two-step workflow to a single step subworkflow:

scatter-nested-workflow.cwl#
#!/usr/bin/env cwl-runner
cwlVersion: v1.2
class: Workflow

requirements:
 ScatterFeatureRequirement: {}
 SubworkflowFeatureRequirement: {}

inputs:
  message_array: string[]

steps:
  subworkflow:
    run:
      class: Workflow
      inputs:
        message: string
      outputs: []
      steps:
        echo:
          run: hello_world_to_stdout.cwl
          in:
            message: message
          out: [echo_out]
        wc:
          run: wc-tool.cwl
          in:
            input_file: echo/echo_out
          out: []
    scatter: message
    in:
      message: message_array
    out: []
outputs: []

Now the scatter acts on a single step, but that step consists of two steps so each step is performed in parallel.

2.10.4. Conditional Workflows#

This workflow contains a conditional step and is executed based on the input. This allows workflows to skip additional steps based on input parameters given at the start of the program or by previous steps.

conditional-workflow.cwl#
class: Workflow
cwlVersion: v1.2
inputs:
  val: int

steps:

  step1:
    in:
      in1: val
      a_new_var: val
    run: foo.cwl
    when: $(inputs.in1 < 1)
    out: [out1]

  step2:
    in:
      in1: val
      a_new_var: val
    run: foo.cwl
    when: $(inputs.a_new_var > 2)
    out: [out1]

outputs:
  out1:
    type: string
    outputSource:
      - step1/out1
      - step2/out1
    pickValue: first_non_null

requirements:
  InlineJavascriptRequirement: {}
  MultipleInputFeatureRequirement: {}

The first thing you’ll notice is that this workflow is only compatible for version 1.2 or greater of the CWL standards.

class: Workflow
cwlVersion: v1.2

The first step of the workflow (step1) contains two input properties and will execute foo.cwl when the conditions are met. The new property when is where the condition validation takes place. In this case only when in1 from the workflow contains a value < 1 this step will be executed.

steps:

  step1:
    in:
      in1: val
      a_new_var: val
    run: foo.cwl
    when: $(inputs.in1 < 1)
    out: [out1]

Using the following command cwltool cond-wf-003.1.cwl --val 0 the value will pass the first conditional step and will therefore be executed and is shown in the log by INFO [step step1] start whereas the second step is skipped as indicated by INFO [step step2] will be skipped.

INFO [workflow ] start
INFO [workflow ] starting step step1
INFO [step step1] start
INFO [job step1] /private/tmp/docker_tmpdcyoto2d$ echo

INFO [job step1] completed success
INFO [step step1] completed success
INFO [workflow ] starting step step2
INFO [step step2] will be skipped
INFO [step step2] completed skipped
INFO [workflow ] completed success
{
    "out1": "foo 0"
}
INFO Final process status is success

When a value of 3 is given the first conditional step will not be executed but the second step will cwltool cond-wf-003.1.cwl --val 3.

INFO [workflow ] start
INFO [workflow ] starting step step1
INFO [step step1] will be skipped
INFO [step step1] completed skipped
INFO [workflow ] starting step step2
INFO [step step2] start
INFO [job step2] /private/tmp/docker_tmpqwr93mxx$ echo

INFO [job step2] completed success
INFO [step step2] completed success
INFO [workflow ] completed success
{
    "out1": "foo 3"
}
INFO Final process status is success

If no conditions are met for example when using --val 2 the workflow will raise a permanentFail.

$ cwltool cond-wf-003.1.cwl --val 2

INFO [workflow ] start
INFO [workflow ] starting step step1
INFO [step step1] will be skipped
INFO [step step1] completed skipped
INFO [workflow ] starting step step2
INFO [step step2] will be skipped
INFO [step step2] completed skipped
ERROR [workflow ] Cannot collect workflow output: All sources for 'out1' are null
INFO [workflow ] completed permanentFail
WARNING Final process status is permanentFail