Config Systems: Airflow and Dagster | Dagster Blog

May 16, 202312 minute read

Config Systems: Airflow and Dagster

Contrasting the Airflow and Dagster configuration systems by rewriting the Airflow Slack Integration.
Joe Van Drunen
Name
Joe Van Drunen
Handle

”Config system.” — A phrase that'll cause a pit in the stomach of a programmer…

Being asked to “configure” in the first place is a tall ask; I'd much rather not have to spend time figuring out the correct magic environment variables to set. But to also understand an entire “system”? Oof, no, thank you. I'd have better luck trying to understand what's going on in the head of the squirrel outside my office window, let alone some OSS programmer from years ago.

And yet, orchestration without interacting with external systems is not very useful, and interacting with external systems requires configuration. And so, it's essential to know how to coherently organize and structure how your orchestration framework works with external systems. If the orchestration framework you are building on top of has yet to think hard about the abstractions it uses to manage this, you will be missing out.

From Airflow to Dagster

When data teams are considering moving from Airflow to Dagster, they've likely invested time writing custom integrations for Airflow, and they've spent the time learning the Airflow configuration system. It might seem daunting to leave that work behind.

While the dagster-airflow library lets you port your Airflow DAGs to Dagster with 0 code changes, it's best to write new integrations directly using Dagster's well-supported, modern abstractions. As you'll see below, the Dagster code is much more succinct, easier to understand, and easier to debug than the equivalent Airflow code.

Making it concrete

Talking about config systems in the abstract isn't very useful, so to try to illustrate the differences between Dagster and Airflow, we'll rewrite the Airflow Slack Integration in Dagster.

Let's start by rewriting the class definition and parameters for the Airflow Slack Hook as a Dagster Resource:

Airflow

import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence

from slack_sdk import WebClient

from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.slack.utils import ConnectionExtraConfig
from airflow.utils.log.secrets_masker import mask_secret

class SlackHook(BaseHook):
  conn_name_attr = "slack_conn_id"
  default_conn_name = "slack_api_default"
  conn_type = "slack"
  hook_name = "Slack API"

  def __init__(
      self,
      token: str | None = None,
      slack_conn_id: str | None = None,
      base_url: str | None = None,
      timeout: int | None = None,
      proxy: str | None = None,
      retry_handlers: list[RetryHandler] | None = None,
      **extra_client_args: Any,
  ) -> None:
      if not token and not slack_conn_id:
          raise AirflowException("Either `slack_conn_id` or `token` should be provided.")
      if token:
          mask_secret(token)
          warnings.warn(
              "Provide token as hook argument deprecated by security reason and will be removed "
              "in a future releases. Please specify token in `Slack API` connection.",
              DeprecationWarning,
              stacklevel=2,
          )
      if not slack_conn_id:
          warnings.warn(
              "You have not set parameter `slack_conn_id`. Currently `Slack API` connection id optional "
              "but in a future release it will mandatory.",
              FutureWarning,
              stacklevel=2,
          )

      super().__init__()
      self._token = token
      self.slack_conn_id = slack_conn_id
      self.base_url = base_url
      self.timeout = timeout
      self.proxy = proxy
      self.retry_handlers = retry_handlers
      self.extra_client_args = extra_client_args
      if self.extra_client_args.pop("use_session", None) is not None:
          warnings.warn("`use_session` has no affect in slack_sdk.WebClient.", UserWarning, stacklevel=2)

Dagster

class SlackResource(ConfigurableResource):
  token: str = Field(
      description="Slack API Token.",
  )
  base_url: str | None = Field(
      description="A string representing the Slack API base URL. If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``).",
      default=None,
  )
  timeout: int | None = Field(
      description="The maximum number of seconds the client will wait to connect and receive a response from Slack. If not set than default WebClient value will use.",
      default=None,
  )
  proxy: str | None = Field(
      description="Proxy to make the Slack API call.",
      default=None,
  )

The Pydantic attributes available in Dagster allow for simpler syntax and annotating parameters with descriptions that will be available for users to see in the Dagster UI. We’ll review the UI for both Airflow Connections and Dagster resources later.

Next, let's rewrite the Slack client definition:

Airflow

        @cached_property
    def client(self) -> WebClient:
        """Get the underlying slack_sdk.WebClient (cached)."""
        return WebClient(**self._get_conn_params())

    def get_conn(self) -> WebClient:
        """Get the underlying slack_sdk.WebClient (cached)."""
        return self.client

    def _get_conn_params(self) -> dict[str, Any]:
        """Fetch connection params as a dict and merge it with hook parameters."""
        conn = self.get_connection(self.slack_conn_id) if self.slack_conn_id else None
        conn_params: dict[str, Any] = {"retry_handlers": self.retry_handlers}

        if self._token:
            conn_params["token"] = self._token
        elif conn:
            if not conn.password:
                raise AirflowNotFoundException(
                    f"Connection ID {self.slack_conn_id!r} does not contain password (Slack API Token)."
                )
            conn_params["token"] = conn.password

        extra_config = ConnectionExtraConfig(
            conn_type=self.conn_type,
            conn_id=conn.conn_id if conn else None,
            extra=conn.extra_dejson if conn else {},
        )

        # Merge Hook parameters with Connection config
        conn_params.update(
            {
                "timeout": self.timeout or extra_config.getint("timeout", default=None),
                "base_url": self.base_url or extra_config.get("base_url", default=None),
                "proxy": self.proxy or extra_config.get("proxy", default=None),
            }
        )

        # Add additional client args
        conn_params.update(self.extra_client_args)
        if "logger" not in conn_params:
            conn_params["logger"] = self.log

        return {k: v for k, v in conn_params.items() if v is not None}

    @cached_property
    def token(self) -> str:
        warnings.warn(
            "`SlackHook.token` property deprecated and will be removed in a future releases.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._get_conn_params()["token"]

    def __get_token(self, token: Any, slack_conn_id: Any) -> str:
        warnings.warn(
            "`SlackHook.__get_token` method deprecated and will be removed in a future releases.",
            DeprecationWarning,
            stacklevel=2,
        )
        if token is not None:
            return token

        if slack_conn_id is not None:
            conn = self.get_connection(slack_conn_id)

            if not getattr(conn, "password", None):
                raise AirflowException("Missing token(password) in Slack connection")
            return conn.password

        raise AirflowException("Cannot get token: No valid Slack token nor slack_conn_id supplied.")

Dagster

        @cached_property
    def client(self) -> WebClient:
        """Get the underlying slack_sdk.WebClient (cached)."""
        client_kwargs = {
            "token": self.token,
            "proxy": self.proxy,
            "retry_handlers": self.retry_handlers,
        }
        if self.base_url is not None:
            client_kwargs["base_url"] = self.base_url
        if self.timeout is not None:
            client_kwargs["timeout"] = self.timeout
        return WebClient(**client_kwargs)

Hopefully, you are noticing a theme: Dagster requires less code to do the same thing. The Airflow connection abstraction requires a bunch of boilerplate and indirection in the Airflow code that we can remove when rewriting in Dagster, making the code more readable.

Now let's rewrite the actual API functions:

Airflow

        def call(self, api_method: str, **kwargs) -> SlackResponse:
        return self.client.api_call(api_method, **kwargs)

    def send_file(
        self,
        *,
        channels: str | Sequence[str] | None = None,
        file: str | Path | None = None,
        content: str | None = None,
        filename: str | None = None,
        filetype: str | None = None,
        initial_comment: str | None = None,
        title: str | None = None,
    ) -> SlackResponse:
        if not ((not file) ^ (not content)):
            raise ValueError("Either `file` or `content` must be provided, not both.")
        elif file:
            file = Path(file)
            with open(file, "rb") as fp:
                if not filename:
                    filename = file.name
                return self.client.files_upload(
                    file=fp,
                    filename=filename,
                    filetype=filetype,
                    initial_comment=initial_comment,
                    title=title,
                    channels=channels,
                )

        return self.client.files_upload(
            content=content,
            filename=filename,
            filetype=filetype,
            initial_comment=initial_comment,
            title=title,
            channels=channels,
        )

Dagster

    def call(self, api_method: str, **kwargs) -> SlackResponse:
        return self.client.api_call(api_method, **kwargs)

    def send_file(
        self,
        *,
        channels: str | Sequence[str] | None = None,
        file: str | Path | None = None,
        content: str | None = None,
        filename: str | None = None,
        filetype: str | None = None,
        initial_comment: str | None = None,
        title: str | None = None,
    ) -> SlackResponse:
        if not ((not file) ^ (not content)):
            raise ValueError("Either `file` or `content` must be provided, not both.")
        elif file:
            file = Path(file)
            with open(file, "rb") as fp:
                if not filename:
                    filename = file.name
                return self.client.files_upload(
                    file=fp,
                    filename=filename,
                    filetype=filetype,
                    initial_comment=initial_comment,
                    title=title,
                    channels=channels,
                )

        return self.client.files_upload(
            content=content,
            filename=filename,
            filetype=filetype,
            initial_comment=initial_comment,
            title=title,
            channels=channels,
        )

Yep, they're the same; the differences between Dagster and Airflow integrations almost entirely boil down to the abstractions and patterns each chose for managing configuration.

What about the UI?

Airflow Hooks do not exist in the UI. Connections however do, but require some special code to get them rendering in an intuitive way. Dagster does not need this and can interpret your Resources classes into the UI without extra work:

To get something similar in Airflow for our Slack hook, we need to add the following to the airflow hook class:

        @classmethod
    def get_connection_form_widgets(cls) -> dict[str, Any]:
        """Returns dictionary of widgets to be added for the hook to handle extra values."""
        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
        from flask_babel import lazy_gettext
        from wtforms import IntegerField, StringField
        from wtforms.validators import NumberRange, Optional

        return {
            "timeout": IntegerField(
                lazy_gettext("Timeout"),
                widget=BS3TextFieldWidget(),
                validators=[Optional(strip_whitespace=True), NumberRange(min=1)],
                description="Optional. The maximum number of seconds the client will wait to connect "
                "and receive a response from Slack API.",
            ),
            "base_url": StringField(
                lazy_gettext("Base URL"),
                widget=BS3TextFieldWidget(),
                description="Optional. A string representing the Slack API base URL.",
            ),
            "proxy": StringField(
                lazy_gettext("Proxy"),
                widget=BS3TextFieldWidget(),
                description="Optional. Proxy to make the Slack API call.",
            ),
        }

    @classmethod
    @_ensure_prefixes(conn_type="slack")
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Returns custom field behaviour."""
        return {
            "hidden_fields": ["login", "port", "host", "schema", "extra"],
            "relabeling": {
                "password": "Slack API Token",
            },
            "placeholders": {
                "password": "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx",
                "timeout": "30",
                "base_url": "https://www.slack.com/api/",
                "proxy": "http://localhost:9000",
            },
        }

What this is doing is overriding the default UI values in the connection UI with something that makes more sense for the Slack-specific connection; note that in the DB, the password field will still be used, and it's only when rendering the UI will it be overridden to “Slack API Token”.

Authoring pipelines

Now that we’ve defined our Slack Integrations we can start using them in our pipelines, below is what this would look like in Airflow and Dagster

Airflow

import pendulum
from airflow.models.dag import DAG
from airflow.providers.slack.operators.slack import SlackAPIFileOperator

with DAG(
    dag_id="slack_example",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "slack_conn_id": "slack",
    },
    max_active_runs=1,
) as dag:
    slack_operator_ = SlackAPIFileOperator(
        task_id="slack_operator_example",
        channel="#random",
        content="hello world",
    )

Dagster

import os
from .slack_resource import SlackResource
from dagster import Definitions, job, op

@op
def send_slack_message(slack: SlackResource):
    slack.send_file(
        channels="#random",
        content="hello world",
    )

@job
def slack_job():
    send_slack_message()

definitions = Definitions(
    jobs=[slack_job],
    resources={
        "slack": SlackResource(
            token=os.getenv("SLACK_API_TOKEN", "fake_token"),
        ),
    },
)

It might seem like Airflow is more concise, but we haven’t yet talked about the SlackAPIFileOperator. Airflow Operators are another abstraction that normally wraps Airflow Hooks, usually operators focus on implementing various orchestration lifecycle methods but in the case of SlackAPIFileOperator it implements the single execute method.

from __future__ import annotations

import json
import warnings
from typing import Any, Sequence

from airflow.compat.functools import cached_property
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.utils.log.secrets_masker import mask_secret

class SlackAPIOperator(BaseOperator):
    """
    Base Slack Operator
    The SlackAPIPostOperator is derived from this operator.
    In the future additional Slack API Operators will be derived from this class as well.
    Only one of `slack_conn_id` and `token` is required.

    :param slack_conn_id: :ref:`Slack API Connection <howto/connection:slack>`
        which its password is Slack API token. Optional
    :param token: Slack API token (https://api.slack.com/web). Optional
    :param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
    :param api_params: API Method call parameters (https://api.slack.com/methods). Optional
    :param client_args: Slack Hook parameters. Optional. Check airflow.providers.slack.hooks.SlackHook
    """

    def __init__(
        self,
        *,
        slack_conn_id: str | None = None,
        token: str | None = None,
        method: str | None = None,
        api_params: dict | None = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        if token:
            mask_secret(token)
        self.token = token
        self.slack_conn_id = slack_conn_id

        self.method = method
        self.api_params = api_params

    @cached_property
    def hook(self) -> SlackHook:
        """Slack Hook."""
        return SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)

    def construct_api_call_params(self) -> Any:
        """
        Used by the execute function. Allows templating on the source fields
        of the api_call_params dict before construction

        Override in child classes.
        Each SlackAPIOperator child class is responsible for
        having a construct_api_call_params function
        which sets self.api_call_params with a dict of
        API call parameters (https://api.slack.com/methods)
        """
        raise NotImplementedError(
            "SlackAPIOperator should not be used directly. Chose one of the subclasses instead"
        )

    def execute(self, **kwargs):
        if not self.api_params:
            self.construct_api_call_params()
        self.hook.call(self.method, json=self.api_params)

class SlackAPIFileOperator(SlackAPIOperator):
    """
    Send a file to a slack channels
    Examples:

    .. code-block:: python

        # Send file with filename and filetype
        slack_operator_file = SlackAPIFileOperator(
            task_id="slack_file_upload_1",
            dag=dag,
            slack_conn_id="slack",
            channels="#general,#random",
            initial_comment="Hello World!",
            filename="/files/dags/test.txt",
            filetype="txt",
        )

        # Send file content
        slack_operator_file_content = SlackAPIFileOperator(
            task_id="slack_file_upload_2",
            dag=dag,
            slack_conn_id="slack",
            channels="#general",
            initial_comment="Hello World!",
            content="file content in txt",
        )

    :param channels: Comma-separated list of channel names or IDs where the file will be shared.
        If set this argument to None, then file will send to associated workspace. (templated)
    :param initial_comment: message to send to slack. (templated)
    :param filename: name of the file (templated)
    :param filetype: slack filetype. (templated) See: https://api.slack.com/types/file#file_types
    :param content: file content. (templated)
    :param title: title of file. (templated)
    :param channel: (deprecated) channel in which to sent file on slack name
    """

    template_fields: Sequence[str] = (
        "channels",
        "initial_comment",
        "filename",
        "filetype",
        "content",
        "title",
    )
    ui_color = "#44BEDF"

    def __init__(
        self,
        channels: str | Sequence[str] | None = None,
        initial_comment: str | None = None,
        filename: str | None = None,
        filetype: str | None = None,
        content: str | None = None,
        title: str | None = None,
        channel: str | None = None,
        **kwargs,
    ) -> None:
        if channel:
            warnings.warn(
                "Argument `channel` is deprecated and will removed in a future releases. "
                "Please use `channels` instead.",
                DeprecationWarning,
                stacklevel=2,
            )
            if channels:
                raise ValueError(f"Cannot set both arguments: channel={channel!r} and channels={channels!r}.")
            channels = channel

        self.channels = channels
        self.initial_comment = initial_comment
        self.filename = filename
        self.filetype = filetype
        self.content = content
        self.title = title
        super().__init__(method="files.upload", **kwargs)

    def execute(self, **kwargs):
        self.hook.send_file(
            channels=self.channels,
            # For historical reason SlackAPIFileOperator use filename as reference to file
            file=self.filename,
            content=self.content,
            initial_comment=self.initial_comment,
            title=self.title,
        )

Abstractions with lots of magic boilerplate are not free, and the distinction of what should exist in an Airflow Hook vs an Airflow Operator is often hard to understand for users, because of this its not uncommon to see Airflow users combine both into the Operator and forgo using Hooks entirely. In Dagster, resources exist as the single abstraction for creating integrations.

The Airflow Config System

Airflow’s configuration is mostly based on 4 core abstractions: Connections, Variables, Hooks, and Operators.

  • Connections are static semi-structured configuration entries in the airflow database, there are a bunch of different ways of setting these, but generally Connections can be understood as persistent configuration state.
  • Variables are similar to Connections in that they are stored in the airflow database and can be modified from the Airflow UI but they are different in that they are a generic key/value store and are not intended to be used in hooks (but can be if you want)
  • Hooks are an abstraction that uses the config state of Connections to define an API for interacting with that system, they exist only as python classes extending the BaseHook class.
  • Operators are the user-facing orchestration primitive; these may depend on hooks or connections and are supposed to mainly focus on the mechanics of orchestrating, not making the correct API interaction with external systems (that's what hooks are meant for). however, it's not uncommon to see Operators not use hooks and combine those two concerns.

Now where the Airflow Config system gets spicy is with its templating system; when the airflow Scheduler executes DAGs, it first applies a Jinja-based templating engine to the DAG definitions. Allowing DAG authors to directly reference the values stored in Airflow Connections and Variables directly in their Operator invocations which in turn will pass those to hooks that at execution time might also be fetching Connection and Variable values internally. Sound confusing? It is.

The Dagster Config System

With the 1.3 release of Dagster, a new pythonic resource and config system has been mainlined, making defining the config and params needed by integrations more simple than ever, and the pre-existing resource abstraction already provides a much simpler and structured approach to what is available in Airflow.

To be reductive, Dagster’s resource system has three primary distinctions vs Airflow:

  1. It makes integration dependencies explicit rather than burying them in the class implementation of operator code
  2. It does not dictate how configuration values are stored; resources can pull from environment variables or other external configuration stores.
  3. It handles the UI for you, no magic class methods or class properties that you need to set to make the UI render your resource correctly.
Config System: Ops and Resources

See for yourself

If you’d like to view and run both the Airflow and Dagster code in this post and evaluate them yourself you can do that with the example repository here. Dagster is continuing to be laser-focused on providing the best developer ergonomics possible and its config system is a key part of that.