Source code for data_pipelines_cli.dbt_utils

from __future__ import annotations

import pathlib
import subprocess
import sys
from typing import Any, Dict, Tuple

import yaml

from .cli_constants import BUILD_DIR, get_dbt_profiles_env_name
from .cli_utils import echo_subinfo, subprocess_run
from .config_generation import read_dictionary_from_config_directory
from .data_structures import DataPipelinesConfig, read_env_config
from .errors import NoConfigFileError


[docs]def read_dbt_vars_from_configs(env: str) -> Dict[str, Any]: """Read `vars` field from dp configuration file (``$HOME/.dp.yml``), base ``dbt.yml`` config (``config/base/dbt.yml``) and environment-specific config (``config/{env}/dbt.yml``) and compile into one dictionary. :param env: Name of the environment :type env: str :return: Dictionary with `vars` and their keys :rtype: Dict[str, Any] """ dbt_env_config = read_dictionary_from_config_directory( BUILD_DIR.joinpath("dag"), env, "dbt.yml" ) try: dp_config = read_env_config() except NoConfigFileError: dp_config = DataPipelinesConfig(templates={}, vars={}) dp_vars = dp_config.get("vars", {}) dbt_vars: Dict[str, str] = dbt_env_config.get("vars", {}) return dict(dbt_vars, **dp_vars)
def _dump_dbt_vars_from_configs_to_string(env: str) -> str: dbt_vars = read_dbt_vars_from_configs(env) return yaml.dump(dbt_vars, default_flow_style=True, width=sys.maxsize)
[docs]def run_dbt_command( command: Tuple[str, ...], env: str, profiles_path: pathlib.Path, log_format_json: bool = False, capture_output: bool = False, ) -> subprocess.CompletedProcess[bytes]: """ Run dbt subprocess in a context of specified *env*. :param command: Tuple representing dbt command and its optional arguments :type command: Tuple[str, ...] :param env: Name of the environment :type env: str :param profiles_path: Path to the directory containing `profiles.yml` file :type profiles_path: pathlib.Path :param log_format_json: Whether to run dbt command with `--log-format=json` flag :type log_format_json: bool :param capture_output: Whether to capture stdout of subprocess. :type capture_output: bool :return: State of the completed process :rtype: subprocess.CompletedProcess[bytes] :raises SubprocessNotFound: dbt not installed :raises SubprocessNonZeroExitError: dbt exited with error """ command_str = " ".join(list(command)) echo_subinfo(f"dbt {command_str}") dbt_env_config = read_dictionary_from_config_directory( BUILD_DIR.joinpath("dag"), env, "dbt.yml" ) dbt_vars = _dump_dbt_vars_from_configs_to_string(env) return subprocess_run( [ "dbt", *(["--log-format=json"] if log_format_json else []), *command, "--profile", dbt_env_config["target_type"], "--profiles-dir", str(profiles_path), "--target", get_dbt_profiles_env_name(env), "--vars", dbt_vars, ], capture_output=capture_output, )