Source code for data_pipelines_cli.data_structures

import sys
from typing import Any, Dict, List

import yaml

from data_pipelines_cli.cli_utils import echo_warning
from data_pipelines_cli.errors import DataPipelinesError, NoConfigFileError
from data_pipelines_cli.io_utils import git_revision_hash

if sys.version_info >= (3, 8):
    from typing import TypedDict  # pylint: disable=no-name-in-module
else:
    from typing_extensions import TypedDict


[docs]class TemplateConfig(TypedDict): """ POD representing value referenced in the `templates` section of the `.dp.yml` config file. """ template_name: str """Name of the template""" template_path: str """Local path or Git URI to the template repository"""
[docs]class DataPipelinesConfig(TypedDict): """POD representing `.dp.yml` config file.""" templates: Dict[str, TemplateConfig] """Dictionary of saved templates to use in `dp create` command""" vars: Dict[str, str] """Variables to be passed to dbt as `--vars` argument"""
[docs]def read_env_config() -> DataPipelinesConfig: """ Parse `.dp.yml` config file, if it exists. Otherwise, raises :exc:`.NoConfigFileError`. :return: POD representing `.dp.yml` config file, if it exists :rtype: DataPipelinesConfig :raises NoConfigFileError: `.dp.yml` file not found """ # Avoiding a dependency loop between `cli_constants` and `data_structures` from data_pipelines_cli.cli_constants import ENV_CONFIGURATION_PATH if not ENV_CONFIGURATION_PATH.is_file(): echo_warning( "No configuration file found. Run 'dp init' to create it.", ) raise NoConfigFileError() with open(ENV_CONFIGURATION_PATH, "r") as f: return yaml.safe_load(f)
[docs]class DockerArgs: """Arguments required by the Docker to make a push to the repository. :raises DataPipelinesError: *repository* variable not set or git hash not found """ repository: str """URI of the Docker images repository""" commit_sha: str """Long hash of the current Git revision. Used as an image tag""" def __init__(self, env: str) -> None: self.repository = self._get_docker_repository_uri_from_k8s_config(env) commit_sha = git_revision_hash() if not commit_sha: raise DataPipelinesError("Could not get git revision hash.") self.commit_sha = commit_sha
[docs] def docker_build_tag(self) -> str: """ Prepare a tag for Docker Python API build command. :return: Tag for Docker Python API build command :rtype: str """ return f"{self.repository}:{self.commit_sha}"
@staticmethod def _get_docker_repository_uri_from_k8s_config(env: str) -> str: # Avoiding a dependency loop between `cli_constants` and `data_structures` from data_pipelines_cli.cli_constants import BUILD_DIR from data_pipelines_cli.config_generation import ( read_dictionary_from_config_directory, ) execution_env_config = read_dictionary_from_config_directory( BUILD_DIR.joinpath("dag"), env, "execution_env.yml" ) try: return execution_env_config["image"]["repository"] except KeyError as key_error: raise DataPipelinesError( f"Could not find 'repository' variable in build/config/{env}/execution_env.yml." ) from key_error
[docs]class DbtTableColumn(TypedDict, total=False): """POD representing a single column from 'schema.yml' file.""" name: str description: str meta: Dict[str, Any] quote: bool tests: List[str] tags: List[str]
[docs]class DbtModel(TypedDict, total=False): """POD representing a single model from 'schema.yml' file.""" name: str description: str meta: Dict[str, Any] identifier: str tests: List[str] tags: List[str] columns: List[DbtTableColumn]
[docs]class DbtSource(TypedDict, total=False): """POD representing a single source from 'schema.yml' file.""" name: str description: str database: str schema: str meta: Dict[str, Any] tags: List[str] tables: List[DbtModel]