data_pipelines_cli package

data-pipelines-cli (dp) is a CLI tool designed for data platform.

dp helps data analysts to create, maintain and make full use of their data pipelines.

Subpackages

Submodules

data_pipelines_cli.cli module

data_pipelines_cli.cli_constants module

DEFAULT_GLOBAL_CONFIG: data_pipelines_cli.data_structures.DataPipelinesConfig = {'templates': {}, 'vars': {}}

Content of the config file created by dp init command if no template path is provided

IMAGE_TAG_TO_REPLACE: str = '<IMAGE_TAG>'
PROFILE_NAME_ENV_EXECUTION = 'env_execution'

Name of the dbt target to use for a remote machine

PROFILE_NAME_LOCAL_ENVIRONMENT = 'local'

Name of the environment and dbt target to use for a local machine

get_dbt_profiles_env_name(env: str) str[source]

Given a name of the environment, returns one of target names expected by the profiles.yml file.

Parameters

env (str) – Name of the environment

Returns

Name of the target to be used in profiles.yml

data_pipelines_cli.cli_utils module

echo_error(text: str, **kwargs: Any) None[source]

Print an error message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_info(text: str, **kwargs: Any) None[source]

Print a message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_subinfo(text: str, **kwargs: Any) None[source]

Print a subinfo message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_warning(text: str, **kwargs: Any) None[source]

Print a warning message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

get_argument_or_environment_variable(argument: Optional[str], argument_name: str, environment_variable_name: str) str[source]

Given argument is not None, return its value. Otherwise, search for environment_variable_name amongst environment variables and return it. If such a variable is not set, raise DataPipelinesError.

Parameters
  • argument (Optional[str]) – Optional value passed to the CLI as the argument_name

  • argument_name (str) – Name of the CLI’s argument

  • environment_variable_name (str) – Name of the environment variable to search for

Returns

Value of the argument or specified environment variable

Raises

DataPipelinesErrorargument is None and environment_variable_name is not set

subprocess_run(args: List[str]) subprocess.CompletedProcess[bytes][source]

Run subprocess and return its state if completed with a success. If not, raise SubprocessNonZeroExitError.

Parameters

args (List[str]) – List of strings representing subprocess and its arguments

Returns

State of the completed process

Return type

subprocess.CompletedProcess[bytes]

Raises

SubprocessNonZeroExitError – subprocess exited with non-zero exit code

data_pipelines_cli.config_generation module

class DbtProfile(**kwargs)[source]

Bases: dict

POD representing dbt’s profiles.yml file.

outputs: Dict[str, Dict[str, Any]]

Dictionary of a warehouse data and credentials, referenced by target name

target: str

Name of the target for dbt to run

copy_config_dir_to_build_dir() None[source]

Recursively copy config directory to build/dag/config working directory.

copy_dag_dir_to_build_dir() None[source]

Recursively copy dag directory to build/dag working directory.

generate_profiles_dict(env: str, copy_config_dir: bool) Dict[str, data_pipelines_cli.config_generation.DbtProfile][source]

Generate and save profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Dictionary representing data to be saved in profiles.yml

Return type

Dict[str, DbtProfile]

generate_profiles_yml(env: str, copy_config_dir: bool = True) pathlib.Path[source]

Generate and save profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Path to build/profiles/{env}

Return type

pathlib.Path

get_profiles_dir_build_path(env: str) pathlib.Path[source]

Returns path to build/profiles/<profile_name>/, depending on env argument.

Parameters

env (str) – Name of the environment

Returns

Return type

pathlib.Path

read_dictionary_from_config_directory(config_path: Union[str, os.PathLike[str]], env: str, file_name: str) Dict[str, Any][source]

Read dictionaries out of file_name in both base and env directories, and compile them into one. Values from env directory get precedence over base ones.

Parameters
  • config_path (Union[str, os.PathLike[str]]) – Path to the config directory

  • env (str) – Name of the environment

  • file_name (str) – Name of the YAML file to parse dictionary from

Returns

Compiled dictionary

Return type

Dict[str, Any]

data_pipelines_cli.data_structures module

class DataPipelinesConfig(**kwargs)[source]

Bases: dict

POD representing .dp.yml config file.

templates: Dict[str, data_pipelines_cli.data_structures.TemplateConfig]

Dictionary of saved templates to use in dp create command

vars: Dict[str, str]

Variables to be passed to dbt as –vars argument

class DbtModel(**kwargs)[source]

Bases: dict

POD representing a single model from ‘schema.yml’ file.

columns: List[data_pipelines_cli.data_structures.DbtTableColumn]
description: str
identifier: str
meta: Dict[str, Any]
name: str
tags: List[str]
tests: List[str]
class DbtSource(**kwargs)[source]

Bases: dict

POD representing a single source from ‘schema.yml’ file.

database: str
description: str
meta: Dict[str, Any]
name: str
schema: str
tables: List[data_pipelines_cli.data_structures.DbtModel]
tags: List[str]
class DbtTableColumn(**kwargs)[source]

Bases: dict

POD representing a single column from ‘schema.yml’ file.

description: str
meta: Dict[str, Any]
name: str
quote: bool
tags: List[str]
tests: List[str]
class DockerArgs(env: str)[source]

Bases: object

Arguments required by the Docker to make a push to the repository.

Raises

DataPipelinesErrorrepository variable not set or git hash not found

commit_sha: str

Long hash of the current Git revision. Used as an image tag

docker_build_tag() str[source]

Prepare a tag for Docker Python API build command.

Returns

Tag for Docker Python API build command

Return type

str

repository: str

URI of the Docker images repository

class TemplateConfig(**kwargs)[source]

Bases: dict

POD representing value referenced in the templates section of the .dp.yml config file.

template_name: str

Name of the template

template_path: str

Local path or Git URI to the template repository

read_env_config() data_pipelines_cli.data_structures.DataPipelinesConfig[source]

Parse .dp.yml config file, if it exists. Otherwise, raises NoConfigFileError.

Returns

POD representing .dp.yml config file, if it exists

Return type

DataPipelinesConfig

Raises

NoConfigFileError.dp.yml file not found

data_pipelines_cli.dbt_utils module

read_dbt_vars_from_configs(env: str) Dict[str, Any][source]

Read vars field from dp configuration file ($HOME/.dp.yml), base dbt.yml config (config/base/dbt.yml) and environment-specific config (config/{env}/dbt.yml) and compile into one dictionary.

Parameters

env (str) – Name of the environment

Returns

Dictionary with vars and their keys

Return type

Dict[str, Any]

run_dbt_command(command: Tuple[str, ...], env: str, profiles_path: pathlib.Path) None[source]

Run dbt subprocess in a context of specified env.

Parameters
  • command (Tuple[str, ...]) – Tuple representing dbt command and its optional arguments

  • env (str) – Name of the environment

  • profiles_path (pathlib.Path) – Path to the directory containing profiles.yml file

Raises

data_pipelines_cli.docker_response_reader module

class DockerReadResponse(msg: str, is_error: bool)[source]

Bases: object

POD representing Docker response processed by DockerResponseReader.

is_error: bool

Whether response is error or not

msg: str

Read and processed message

class DockerResponseReader(logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]])[source]

Bases: object

Read and process Docker response.

Docker response turns into processed strings instead of plain dictionaries.

cached_read_response: Optional[List[data_pipelines_cli.docker_response_reader.DockerReadResponse]]

Internal cache of already processed response

click_echo_ok_responses() None[source]

Read, process and print positive Docker updates.

Raises

DockerErrorResponseError – Came across error update in Docker response.

logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]]

Iterable representing Docker response

read_response() List[data_pipelines_cli.docker_response_reader.DockerReadResponse][source]

Read and process Docker response.

Returns

List of processed lines of response

Return type

List[DockerReadResponse]

data_pipelines_cli.errors module

exception AirflowDagsPathKeyError[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if there is no dags_path in airflow.yml file.

message: str

explanation of the error

exception DataPipelinesError(message: str)[source]

Bases: Exception

Base class for all exceptions in data_pipelines_cli module

message: str

explanation of the error

exception DependencyNotInstalledError(program_name: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if certain dependency is not installed

message: str

explanation of the error

exception DockerErrorResponseError(error_msg: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if there is an error response from Docker client.

message: str

explanation of the error

exception DockerNotInstalledError[source]

Bases: data_pipelines_cli.errors.DependencyNotInstalledError

Exception raised if ‘docker’ is not installed

message: str

explanation of the error

exception JinjaVarKeyError(key: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

message: str

explanation of the error

exception NoConfigFileError[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if .dp.yml does not exist

message: str

explanation of the error

exception NotAProjectDirectoryError(project_path: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if .copier-answers.yml file does not exist in given dir

message: str

explanation of the error

exception SubprocessNonZeroExitError(subprocess_name: str, exit_code: int)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if subprocess exits with non-zero exit code

message: str

explanation of the error

exception SubprocessNotFound(subprocess_name: str)[source]

Bases: data_pipelines_cli.errors.DataPipelinesError

Exception raised if subprocess cannot be found

message: str

explanation of the error

data_pipelines_cli.filesystem_utils module

class LocalRemoteSync(local_path: Union[str, os.PathLike[str]], remote_path: str, remote_kwargs: Dict[str, str])[source]

Bases: object

Synchronizes local directory with a cloud storage’s one.

local_fs: fsspec.spec.AbstractFileSystem

FS representing local directory

local_path_str: str

Path to local directory

remote_path_str: str

Path/URI of the cloud storage directory

sync(delete: bool = True) None[source]

Send local files to the remote directory and (optionally) delete unnecessary ones.

Parameters

delete (bool) – Whether to delete remote files that are no longer present in local directory

data_pipelines_cli.io_utils module

git_revision_hash() Optional[str][source]

Get current Git revision hash, if Git is installed and any revision exists.

Returns

Git revision hash, if possible.

Return type

Optional[str]

replace(filename: Union[str, os.PathLike[str]], pattern: str, replacement: str) None[source]

Perform the pure-Python equivalent of in-place sed substitution: e.g., sed -i -e 's/'${pattern}'/'${replacement}' "${filename}".

Beware however, it uses Python regex dialect instead of sed’s one. It can introduce regex-related bugs.

data_pipelines_cli.jinja module

replace_vars_with_values(templated_dictionary: Dict[str, Any], dbt_vars: Dict[str, Any]) Dict[str, Any][source]

Replace variables in given dictionary using Jinja template in its values.

Parameters
  • templated_dictionary (Dict[str, Any]) – Dictionary with Jinja-templated values

  • dbt_vars (Dict[str, Any]) – Variables to replace

Returns

Dictionary with replaced variables

Return type

Dict[str, Any]

Raises

JinjaVarKeyError – Variable referenced in Jinja template does not exist

data_pipelines_cli.vcs_utils module

Utilities related to VCS.

add_suffix_to_git_template_path(template_path: str) str[source]

Add .git suffix to template_path, if necessary.

Check if template_path starts with Git-specific prefix (e.g. git://), or http:// or https:// protocol. If so, then add .git suffix if not present. Does nothing otherwise (as template_path probably points to a local directory).

Parameters

template_path (str) – Path or URI to Git-based repository

Returns

template_path with .git as suffix, if necessary

Return type

str