Orchestrating Python and dbt with Dagster | Dagster Blog

June 20, 20227 minute read

Orchestrating Python and dbt with Dagster

Owen Kephart
Name
Owen Kephart
Handle

Introduction

dbt is a staple of the modern data platform, and has become a critical piece of many companies' transformation worfklows. Nonetheless, a data platform does not begin and end with dbt. Even in the simplest of landscapes, data needs some way of getting into the data warehouse before dbt can transform it, and once it's been transformed, there are plenty of uses for data that go beyond SQL, warehouses, and tables.

Thus, Python enters the picture. Whether it's firing off a request to some external API, training a machine learning model, executing complex code that doesn't translate well to SQL, or even just storing a file on s3, there's a need for a general-purpose programming language.

This raises some new questions:

  • How do I define and execute these Python-based computations if they depend on data transformed by dbt, or vice-versa?
  • How do I view the lineage of data as it moves between these different tools?
  • Where do I go to monitor the current state of all of the data assets in my platform?

Traditional orchestration tools solve part of this problem, allowing you to sequence tasks that run before or after a dbt project, usually providing features such as logging or alerting along the way. This represents a big step up from cron-based scheduling, which is brittle to upstream failures or longer-than-expected execution. However, even the task-focused approach has some serious limitations when applied to a tool like dbt.

When executing a dbt project, from the perspective of the orchestrator, the task is extremely simple — all it needs to do is execute a shell command, and dbt will handle the difficult bits. However, the data assets produced by this simple task are a complex, interconnected web.

the task...
The dbt run command in a terminal.
... vs. the assets
Complex dbt lineage graph with many models.

Understanding and interacting with the task is fundamentally not enough. Knowing if your dbt command succeeded or failed is helpful, but only takes you part of the way to answering the more important question: Is this table up to date?

As a powerful, general purpose orchestrator, Dagster is well-equipped to handle defining, scheduling, executing, and monitoring complex workflows, and the Software Defined Asset APIs provide the ability to express all of these functions in terms of data assets.

Selecting and executing dbt models and python assets in Dagit.
Updating both dbt-backed and Python-python assets from Dagit

This post will go over how...

Software-Defined Assets

To provide some background for those of you who haven't heard of Software-Defined Assets, they are a new Dagster API which provides a layer of metadata on top of an underlying task. This allows the orchestrator to understand the data dependencies within and between the tasks that it executes. Essentially, it's a task wrapped with knowledge of what data assets it will produce, as well as which upstream data assets it'll need to consume as input.

For dbt, this information is parsed out of the metadata that dbt generates for each project:

from dagster_dbt import load_assets_from_dbt_project

dbt_assets = load_assets_from_dbt_project(
    project_dir="path/to/dbt_project"
)

For Python-backed assets, the @asset decorator allows you to specify an asset that you want to exist and how to compute its contents:

@asset
def some_asset(fct_orders):
    # ... some transformation code
    return transformed_orders

In the above snippet, we define a new asset, some_asset, which depends on fct_orders, another asset. This upstream asset can be anything, including a table updated by dbt. The job of ushering data between Python objects and the persisted data object (e.g. a file on s3), is handled by the completely pluggable IOManager abstraction, allowing you to write unit-testable Python code that doesn't need to directly interface with the storage system.

From here, we can discuss the benefits of this approach.

Cross-Technology Lineage

In most data platforms, data bounces through a parade of different tools and technologies on its journey from raw source to polished data product.

This isn't a bad thing: in most cases, each tool represents hundreds or thousands of lines of code that an engineer did not need to write. However, untangling this chain of dependencies becomes increasingly difficult with each added tool.

As the centralized place for executing things in a particular order, it seems natural to ask the orchestrator to provide insight into this voyage. Unfortunately, if we restrict ourselves to a task-focused view of the world, then this high-level view is disappointingly barren:

Yes, we can tell that dbt runs after Airbyte, and some Python steps run after dbt, but if we want any sort of knowledge on the actual assets that are being updated, we'll be forced back into a workflow of paging through an unknown number of different tools, pinging co-workers on Slack for clues on the next place to look. In an asset-focused workflow, we gain the ability to view the full dependency graph within each of these steps:

Much like with dbt, because each asset definition includes information on the upstream dependencies, this graph can be automatically generated from a set of definitions. This makes data lineage a natural byproduct of your data pipelines, rather than information that needs to be scraped out of disparate tools.

Declarative Data Pipelines

Once you have a graph of assets, it becomes simple to define data pipelines that update parts of this global graph. While writing a traditional pipeline might involve manually chaining the output of one task to the input of another, this becomes unnecessary in the world of asset-focused orchestration, as each asset knows which assets are upstream of it.

Creating a job that updates a set of assets can be done using simple selections. If you want a job to update every asset upstream of order_forecast_model, you can write:

define_asset_job("my_job", selection="*order_forecast_model")

Under the hood, Dagster takes this selection and compiles a sequence of steps that need to be run to update these models. If some of this asset's upstream dependencies are computed using dbt, then Dagster will automatically select only the relevant dbt models when it comes time to execute that step.

A graph showing all of the assets in the repository
The graph of all the assets in the repository
A job that has been compiled based off of a selection
The job created for the selection `*order_forecast_model*

Scheduling jobs once they've been defined is also easy:

ScheduleDefinition(job=my_job, cron_string="@daily")

These interfaces make it simple and convenient to schedule workflows that cross the technology gap between dbt, Python, and any tool that you use Dagster to orchestrate. If you contrast this with a task-based approach, the benefits become clear. To accomplish this same end result, you'd need to mentally translate this selection of assets into a series of steps, manually chaining input to output. Once you got to the dbt step, you'd need to hard-code a selection string to select only the relevant models for that step.

All of this seems like unnecessary burden to express a relatively simple concept to the orchestrator: "I would like to update this set of assets at 12AM every day".

Detailed Historical Metadata

In a vaccuum, dbt runs disappear essentially as soon as they complete. However, whenever you materialize an asset using Dagster, detailed metadata is automatically collected and tracked over time. Not only do you get information on the last time any individual asset has been run, you also get a record of every time it has been updated, along with a link to the run that updated it.

It's easy to search for a given asset in Dagit, and be taken directly to a page with detailed metadata about its current and historical states:

Searching for an asset in Dagit's asset catalog.

If a dbt run fails halfway through, warnings will be displayed for all the assets that failed to be executed. Once the root issue is solved, you can kick off a run to rematerialize only the failed models straight from the UI:

Selecting and executing failed dbt models in Dagit.

These features make it tractable to both notice and repair issues in a data platform at a glance. Rather than having to wonder if a particular asset has been updated as expected, this information is just a few clicks away. As an orchestration tool, Dagster not only gives you the ability to notices these issues, but also the ability to kick off computation to fix the problem once from the same exact interface.

Orchestrating Arbitrary Python within a dbt Project

First, we should mentioned that as of October 2022, dbt supports python models that run in your data warehouse. Once this feature is available, we'll be excited to support these Python-based dbt models just as we support regular SQL-based models.

While likely to be quite useful, these Python-based dbt models are not intended to be suitable for every use case. After all, not all assets that you care about are database tables, and not every Python computation that you want to execute can run on the somewhat siloed compute offered by modern data warehouse technologies such as Snowflake or Databricks.

One use case we've seen from our users is roughly the following:

  1. I have a large dbt project that does a bunch of transformation on top of my raw data.
  2. I use that polished data to train a machine learning model. [this model is an asset]
  3. I use that trained model to make some predictions and store them in a table. [this table is also an asset]
  4. I want to do further transformations in dbt that use that table as a source.

In order to execute all these dependencies, you'll need to invoke dbt twice: once for the models before the machine learning workflow, then once for the models that come afterwards.

We've seen a few solutions in the wild for this problem, and they generally come down to going through a dbt graph and manually selecting subsets of their models which are upstream / downstream of a given Python operation, executing these in separate dbt invocations.

This manual selection step is unnecessary in Dagster. After all, Software-Defined Assets allow you to declare the data that you want to exist, and Dagster's job is to figure out a series of steps in order to make that happen. With that in mind, step 4 is a simple as making the table from step 3 a source in your sources.yml file...

# sources.yml
sources:
  - name: forecasting
    tables:
      - name: predicted_orders

... and creating a new dbt model that depends on that source:

-- new_model.sql
select * from
{{ source("forecasting", "predicted_orders") }}
where foo='bar'

From this, Dagster will automatically generate a sequence of steps that will be able to execute these steps in the proper order. In this case, that means invoking dbt twice, once for the models before the machine learning workflow, and again for the ones after:

An operational graph in Dagit showing two separate dbt steps.

This allows you to achieve a truly cross-technology workflow, without having to worry about the details of how to execute it.

Wrapping Up

Software-Defined Assets help navigate the complexity of a truly diverse data landscape. Day to day operations require constant translation between the realm of tasks and the realm of assets:

  • "Is this machine learning model up to date?" : "Has the task that updates this model been run recently?"
  • "Update these tables every morning at 3AM" : "Run these tasks every morning at 3AM"
  • "Where did this table come from" : "What task updates this table, and what's upstream of it?"

By providing asset-focused APIs at the orchestration layer, the burden of this translation is moved away from engineers and analysts, and onto the orchestrator itself.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Integration. Integration