library(dagsterpipes)
library(DBI)
ctx <- open_dagster_pipes()
tryCatch(
{
ctx$log("Starting ETL process.")
con <- dbConnect(
drv = RPostgres::Postgres(),
dbname = ctx$get_extra("DB_NAME"),
host = ctx$get_extra("DB_HOST"),
port = ctx$get_extra("DB_PORT") |> as.integer(),
user = ctx$get_extra("DB_USER"),
password = ctx$get_extra("DB_PASSWORD")
)
ctx$log("Writing `mtcars` to DB.")
dbWriteTable(con, name = "mtcars", value = mtcars, overwrite = TRUE)
ctx$log("Finished table write.")
ctx$report_asset_materialization(
metadata = list(
row_count = pipes_metadata_value(nrow(mtcars), "int")
)
)
ctx$log("ETL process completed.")
ctx$close()
},
error = function(e) {
ctx$close(exception = e)
stop(e)
}
)Introduction
Your team just completed a successful ML model development project. The model performs well on out-of-sample data; the training code is wrapped in an R package with good test coverage; it even has a {pkgdown} documentation website to boot. Everything’s going swell. You share the GitHub repo with the platform engineering team, and their feedback is, “We need this code to be written in Python to wire it up to Dagster. Can you rewrite this in Python?” That rewrite is no longer necessary: with the new R package {dagsterpipes}, you can bring all the power of R to Dagster.
What we’re building
We’re going to create a minimal Dagster project that has an asset implemented in R. Specifically, our asset is going to simply write out the mtcars dataset into a table in a Postgres database. Apart from being able to materialize a Dagster asset written in R, we also want R to communicate progress of the run with the Dagster instance, along with metadata related to the run when the asset is finished materializing. A GitHub repo accompanying this post can be found here.
Dagster Pipes Protocol and {dagsterpipes}
All of this is possible using Dagster’s Pipes Protocol. The Pipes Protocol is a message channel between Dagster and an external process. Dagster sends information relevant to running the asset (partition keys, environment variables, etc.) to the external process. The external process sends logs, metadata, and materialization events back to Dagster. Since the protocol consists of structured messaging, it can be implemented in any programming language. This is where the {dagsterpipes} R package comes in; it provides the implementation of the Pipes Protocol for R.
Our first R-based asset: the R side
Our R asset is going to insert records from mtcars into a Postgres table. Here’s the code.
The important thing here is the ctx variable we get from calling open_dagster_pipes. Short for “context”, ctx is where Dagster shoves information related to the run like partition keys and other environment variables. On the R-side, we can log information about the run that gets sent back to Dagster and shown inside of the run details page. ## Our first R-based asset: the Python side
There are two places in our Python code we need to change to make this work. First, we create the PipesSubprocessClient resource in definitions.py (see code below).
from dagster import Definitions, load_assets_from_modules, PipesSubprocessClient
from dagsterdemo import assets # noqa: TID252
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
resources={
"pipes_subprocess_client": PipesSubprocessClient()
}
)Once this resource is registered in Dagster, we can inject it into any of our Dagster asset functions. The next place we need to make changes is in assets.py as shown below.
import dagster as dg
from pathlib import Path
import os
R_ETL_SCRIPT = Path(__file__).parent.joinpath("etl.R")
@dg.asset()
def r_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient):
return pipes_subprocess_client.run(
command=["Rscript", str(R_ETL_SCRIPT)],
context=context,
extras={
"DB_HOST": os.getenv("DB_HOST"),
"DB_PORT": os.getenv("DB_PORT"),
"DB_NAME": os.getenv("DB_NAME"),
"DB_USER": os.getenv("DB_USER"),
"DB_PASSWORD": os.getenv("DB_PASSWORD")
}
).get_materialize_result()The most important thing to note here is we’re passing that PipesSubprocessClient instance we defined earlier. You might wonder two things about this code. One is whether it’s a coincidence that the argument name for the PipesSubprocessClient in r_asset matches the dict key we gave the PipesSubprocessClient in definitions.py. This isn’t a coincidence: Dagster resolves resources by name, so any resource you want to inject into an asset needs the argument name to match. Another thing you might wonder is why we need this resource when we could just use Python’s subprocess module. The primary difference is that PipesSubprocessClient doesn’t just run the R script: it creates and maintains the message sink where all communication between R and Dagster occurs. Part of this communication involves injecting all of the values in the context variable so that R is aware of things like partitions, environment variables, and so on. You don’t get all of that with a simple call to subprocess.run(). The PipesSubprocessClient also has convenience methods like get_materialize_result for extracting important information from the message sink, in this case the message created by the report_asset_materialization method on the R side.
Conclusion
The {dagsterpipes} package makes it possible for teams working with R or Python to leverage the same powerful orchestration technology. For example, suppose an ML group uses {tidymodels} to train a GAM model. They want to automate retraining the model when prediction accuracy drops below a certain threshold. With {dagsterpipes}, the team can create a Dagster asset for the GAM model that triggers retraining when an upstream evaluation determines accuracy is too low. The team gets automated retraining with minimal refactoring.
This package aims to be a complete implementation of the Pipes Protocol; if you find part of it is lacking, please submit an issue or a PR on Github. The package is only available from GitHub for now; installation instructions can be found here. I plan on submitting the package to CRAN in the near future.