data_pipelines_cli package

Subpackages

Submodules

data_pipelines_cli.cli module

cli() None[source]

data_pipelines_cli.cli_constants module

DEFAULT_GLOBAL_CONFIG: data_pipelines_cli.data_structures.DataPipelinesConfig = {'templates': {}, 'vars': {}}

Content of the config file created by dp init command if no template path is provided

IMAGE_TAG_TO_REPLACE: str = '<IMAGE_TAG>'
PROFILE_NAME_ENV_EXECUTION = 'env_execution'

Name of the dbt target to use for a remote machine

PROFILE_NAME_LOCAL_ENVIRONMENT = 'local'

Name of the environment and dbt target to use for a local machine

get_dbt_profiles_env_name(env: str) str[source]

Given a name of the environment, returns one of target names expected by the profiles.yml file

Parameters

env (str) – Name of the environment

Returns

Name of the target to be used in profiles.yml

data_pipelines_cli.cli_utils module

echo_error(text: str, **kwargs: Any) None[source]

Print an error message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_info(text: str, **kwargs: Any) None[source]

Print a message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_subinfo(text: str, **kwargs: Any) None[source]

Print a subinfo message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_warning(text: str, **kwargs: Any) None[source]

Print a warning message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

get_argument_or_environment_variable(argument: Optional[str], argument_name: str, environment_variable_name: str) str[source]

Given argument is not None, returns its value. Otherwise, searches for environment_variable_name amongst environment variables and returns it. If such a variable is not set, raises DataPipelinesError.

Parameters
  • argument (Optional[str]) – Optional value passed to the CLI as the argument_name

  • argument_name (str) – Name of the CLI’s argument

  • environment_variable_name (str) – Name of the environment variable to search for

Returns

Value of the argument or specified environment variable

Raises

DataPipelinesErrorargument is None and environment_variable_name is not set

subprocess_run(args: List[str]) subprocess.CompletedProcess[bytes][source]

Runs subprocess and returns its state if completed with a success. If not, raises SubprocessNonZeroExitError.

Parameters

args (List[str]) – List of strings representing subprocess and its arguments

Returns

State of the completed process

Return type

subprocess.CompletedProcess[bytes]

Raises

SubprocessNonZeroExitError – subprocess exited with non-zero exit code

data_pipelines_cli.config_generation module

class DbtProfile(**kwargs)[source]

Bases: dict

POD representing dbt’s profiles.yml file

outputs: Dict[str, Dict[str, Any]]

Dictionary of a warehouse data and credentials, referenced by target name

target: str

Name of the target for dbt to run

copy_config_dir_to_build_dir() None[source]

Recursively copies config directory to build/dag/config working directory

copy_dag_dir_to_build_dir() None[source]

Recursively copies dag directory to build/dag working directory

generate_profiles_dict(env: str, copy_config_dir: bool) Dict[str, data_pipelines_cli.config_generation.DbtProfile][source]

Generates and saves profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Dictionary representing data to be saved in profiles.yml

Return type

Dict[str, DbtProfile]

generate_profiles_yml(env: str, copy_config_dir: bool = True) pathlib.Path[source]

Generates and saves profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Path to build/profiles/{env}

Return type

pathlib.Path

get_profiles_yml_build_path(env: str) pathlib.Path[source]

Returns path to build/profiles/<profile_name>/profiles.yml, depending on env argument.

Parameters

env (str) – Name of the environment

Returns

Return type

pathlib.Path

read_dictionary_from_config_directory(config_path: Union[str, os.PathLike[str]], env: str, file_name: str) Dict[str, Any][source]

Reads dictionaries out of file_name in both base and env directories, and compiles them into one. Values from env directory get precedence over base ones

Parameters
  • config_path (Union[str, os.PathLike[str]]) – Path to the config directory

  • env (str) – Name of the environment

  • file_name (str) – Name of the YAML file to parse dictionary from

Returns

Compiled dictionary

Return type

Dict[str, Any]

data_pipelines_cli.data_structures module

class DataPipelinesConfig(**kwargs)[source]

Bases: dict

POD representing .dp.yml config file

templates: Dict[str, data_pipelines_cli.data_structures.TemplateConfig]

Dictionary of saved templates to use in dp create command

vars: Dict[str, str]

Variables to be passed to dbt as –vars argument

class DockerArgs(env: str)[source]

Bases: object

Arguments required by the Docker to make a push to the repository

Raises

DataPipelinesErrorrepository variable not set or git hash not found

commit_sha: str

Long hash of the current Git revision. Used as an image tag

docker_build_tag() str[source]
Returns

Tag for Docker Python API build command.

Return type

str

repository: str

URI of the Docker images repository

class TemplateConfig(**kwargs)[source]

Bases: dict

POD representing value referenced in the templates section of the .dp.yml config file

template_name: str

Name of the template

template_path: str

Local path or Git URI to the template repository

read_config() data_pipelines_cli.data_structures.DataPipelinesConfig[source]

Parses .dp.yml config file, if it exists. Otherwise, raises NoConfigFileError

Returns

POD representing .dp.yml config file, if it exists

Return type

DataPipelinesConfig

Raises

NoConfigFileError.dp.yml file not found

data_pipelines_cli.dbt_utils module

read_dbt_vars_from_configs(env: str) Dict[str, Any][source]

Reads vars field from dp configuration file ($HOME/.dp.yml), base dbt.yml config (config/base/dbt.yml) and environment-specific config (config/{env}/dbt.yml) and compiles into one dictionary.

Parameters

env (str) – Name of the environment

Returns

Dictionary with vars and their keys

Return type

Dict[str, Any]

run_dbt_command(command: Tuple[str, ...], env: str, profiles_path: pathlib.Path) None[source]

Runs dbt subprocess in a context of specified env

Parameters
  • command (Tuple[str, ...]) – Tuple representing dbt command and its optional arguments

  • env (str) – Name of the environment

  • profiles_path (pathlib.Path) – Path to the directory containing profiles.yml file

Raises

data_pipelines_cli.errors module

exception AirflowDagsPathKeyError[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if there is no dags_path in airflow.yml file.

message: str

explanation of the error

exception DataPipelinesError(message: str)[source]

Bases: Exception

Base class for all exceptions in data_pipelines_cli module

message: str

explanation of the error

exception DependencyNotInstalledError(program_name: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if certain dependency is not installed

message: str

explanation of the error

exception DockerNotInstalledError[source]

Bases: data_pipelines_cli.errors.DependencyNotInstalledError

Exception raised if ‘docker’ is not installed

message: str

explanation of the error

exception JinjaVarKeyError(key: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

message: str

explanation of the error

exception NoConfigFileError[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if .dp.yml does not exist

message: str

explanation of the error

exception SubprocessNonZeroExitError(subprocess_name: str, exit_code: int)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if subprocess exits with non-zero exit code

message: str

explanation of the error

exception SubprocessNotFound(subprocess_name: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if subprocess cannot be found

message: str

explanation of the error

data_pipelines_cli.filesystem_utils module

class LocalRemoteSync(local_path: Union[str, os.PathLike[str]], remote_path: str, remote_kwargs: Dict[str, str])[source]

Bases: object

Synchronizes local directory with a cloud storage’s one

local_fs: fsspec.spec.AbstractFileSystem

FS representing local directory

local_path_str: str

Path to local directory

remote_path_str: str

Path/URI of the cloud storage directory

sync(delete: bool = True) None[source]

Sends local files to the remote directory and (optionally) deletes unnecessary ones.

Parameters

delete (bool) – Whether to delete remote files that are no longer present in local directory

data_pipelines_cli.io_utils module

git_revision_hash() Optional[str][source]

Tries to get current Git revision hash, if Git is installed and any revision exists.

Returns

Git revision hash, if possible.

Return type

Optional[str]

replace(filename: Union[str, os.PathLike[str]], pattern: str, replacement: str) None[source]

Perform the pure-Python equivalent of in-place sed substitution: e.g., sed -i -e 's/'${pattern}'/'${replacement}' "${filename}".

Beware however, it uses Python regex dialect instead of sed’s one. It can introduce regex-related bugs.

data_pipelines_cli.vcs_utils module

add_suffix_to_git_template_path(template_path: str) str[source]

Adds .git suffix to template_path, if necessary.

Checks if template_path starts with Git-specific prefix (e.g. git://), or http:// or https:// protocol. If so, then adds .git suffix if not present. Otherwise, it does not (as template_path probably points to a local directory).

Parameters

template_path (str) – Path or URI to Git-based repository

Returns

template_path with .git as suffix, if necessary

Return type

str