Outputs allow you to pass data between tasks and flows.

Tasks and flows can generate outputs, which can be passed to downstream processes. These outputs can be variables or files stored in the internal storage.

How to retrieve outputs

Use the syntax {{ outputs.task_id.output_property }} to retrieve a specific output of a task.

If your task id contains one or more hyphens (i.e. the - sign), wrap the task id in square brackets, e.g. {{ outputs['task-id'].output_property }}.

To see which outputs have been generated during a flow execution, go to the Outputs tab on the Execution page: Output of our previous download

The outputs are useful for troubleshooting and auditing. Additionally, you can use outputs to:

  • share downloadable artifacts with business stakeholders, e.g. a table generated by a SQL query, or a CSV file generated by a Python script
  • pass data between decoupled processes (e.g. pass subflow's outputs or a file detected by S3 trigger to downstream tasks)

Use outputs in your flow

When fetching data from a REST API, Kestra stores that fetched data in the internal storage, and makes it available to downstream tasks using the body output argument.

Use the {{ outputs.task_id.body }} syntax to process that fetched data in a downstream task, as shown in the Python script task below.

yaml
id: getting_started_output
namespace: company.team

inputs:
  - id: api_url
    type: STRING
    defaults: https://dummyjson.com/products

tasks:
  - id: api
    type: io.kestra.plugin.core.http.Request
    uri: "{{ inputs.api_url }}"

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    outputFiles:
      - "products.csv"
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("products.csv")

This flow processes data using Polars and stores the result as a CSV file.

Render expression

When referencing the output from the previous task, this flow uses jq language to extract the products array from the API response — jq is available in all Kestra tasks without having to install it.

You can test {{ outputs.task_id.body | jq('.products') | first }} and any other output parsing expression using the built-in expressions evaluator on the Outputs page:

Evaluate


Passing data between tasks

Let's add another task to the flow to process the CSV file generated by the Python script task. We will use the io.kestra.plugin.jdbc.duckdb.Query task to run a SQL query on the CSV file and store the result as a downloadable artifact in the internal storage.

yaml
id: getting_started
namespace: company.team

tasks:
  - id: api
    type: io.kestra.plugin.core.http.Request
    uri: https://dummyjson.com/products

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    outputFiles:
      - "products.csv"
    script: |
      import polars as pl
      data = {{ outputs.api.body | jq('.products') | first }}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("products.csv")

  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{ workingDir }}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

This example flow passes data between tasks using outputs. The inputFiles argument of the io.kestra.plugin.jdbc.duckdb.Query task allows you to pass files from internal storage to the task. The store: true ensures that the result of the SQL query is stored in the internal storage and can be previewed and downloaded from the Outputs tab.

Evaluate

To sum up, our flow extracts data from an API, uses that data in a Python script, executes a SQL query and generates a downloadable artifact.

yaml
id: getting_started
namespace: company.team

inputs:
  - id: api_url
    type: STRING
    defaults: https://dummyjson.com/products

tasks:
  - id: api
    type: io.kestra.plugin.core.http.Request
    uri: "{{ inputs.api_url }}"

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    outputFiles:
      - "products.csv"
    script: |
      import polars as pl
      data = {{ outputs.api.body | jq('.products') | first }}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("products.csv")

  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{ workingDir }}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

To learn more about outputs, check out the full outputs documentation.

Was this page helpful?