data_pipelines_cli package

data-pipelines-cli (dp) is a CLI tool designed for data platform.

dp helps data analysts to create, maintain and make full use of their data pipelines.

Subpackages

Submodules

data_pipelines_cli.airbyte_utils module

exception AirbyteError[source]

Bases: Exception

class AirbyteFactory(airbyte_config_path: Path, auth_token: Optional[str])[source]

Bases: object

A class used to create and update Airbyte connections defined in config yaml file

airbyte_config_path: Path

Path to config yaml file containing connections definitions

auth_token: Optional[str]

Authorization OIDC ID token for a service account to communication with Airbyte instance

create_update_connection(connection_config: Dict[str, Any], workspace_id: str) Any[source]
create_update_connections() None[source]

Create and update Airbyte connections defined in config yaml file

static env_replacer(config: Dict[str, Any]) Dict[str, Any][source]
static find_config_file(env: str, config_name: str = 'airbyte') Path[source]
get_default_workspace_id() str[source]
request_handler(endpoint: str, data: Optional[Dict[str, Any]] = None) Union[Dict[str, Any], Any][source]
update_file(updated_config: Dict[str, Any]) None[source]
exception AirbyteNoWorkspaceConfiguredError[source]

Bases: AirbyteError

data_pipelines_cli.bi_utils module

class BiAction(value)[source]

Bases: Enum

An enumeration.

COMPILE = 1
DEPLOY = 2
bi(env: str, bi_action: BiAction, key_path: Optional[str] = None) None[source]

Generate and deploy BI codes using dbt compiled data.

Parameters
  • env (str) – Name of the environment

  • bi_action – Action to be run [COMPILE, DEPLOY]

  • key_path – Path to the key with write access to git repository

Raises

NotSuppertedBIError – Not supported bi in bi.yml configuration

read_bi_config(env: str) Dict[str, Any][source]

Read BI configuration.

Parameters

env (str) – Name of the environment

Returns

Compiled dictionary

Return type

Dict[str, Any]

data_pipelines_cli.cli module

cli() None[source]

data_pipelines_cli.cli_configs module

find_datahub_config_file(env: str) Path[source]

data_pipelines_cli.cli_constants module

DEFAULT_GLOBAL_CONFIG: DataPipelinesConfig = {'templates': {}, 'vars': {}}

Content of the config file created by dp init command if no template path is provided

IMAGE_TAG_TO_REPLACE: str = '<IMAGE_TAG>'
PROFILE_NAME_ENV_EXECUTION = 'env_execution'

Name of the dbt target to use for a remote machine

PROFILE_NAME_LOCAL_ENVIRONMENT = 'local'

Name of the environment and dbt target to use for a local machine

get_dbt_profiles_env_name(env: str) str[source]

Given a name of the environment, returns one of target names expected by the profiles.yml file.

Parameters

env (str) – Name of the environment

Returns

Name of the target to be used in profiles.yml

data_pipelines_cli.cli_utils module

echo_error(text: str, **kwargs: Any) None[source]

Print an error message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_info(text: str, **kwargs: Any) None[source]

Print a message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_suberror(text: str, **kwargs: Any) None[source]

Print a suberror message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_subinfo(text: str, **kwargs: Any) None[source]

Print a subinfo message to stdout using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

echo_warning(text: str, **kwargs: Any) None[source]

Print a warning message to stderr using click-specific print function.

Parameters
  • text (str) – Message to print

  • kwargs

get_argument_or_environment_variable(argument: Optional[str], argument_name: str, environment_variable_name: str) str[source]

Given argument is not None, return its value. Otherwise, search for environment_variable_name amongst environment variables and return it. If such a variable is not set, raise DataPipelinesError.

Parameters
  • argument (Optional[str]) – Optional value passed to the CLI as the argument_name

  • argument_name (str) – Name of the CLI’s argument

  • environment_variable_name (str) – Name of the environment variable to search for

Returns

Value of the argument or specified environment variable

Raises

DataPipelinesErrorargument is None and environment_variable_name is not set

subprocess_run(args: List[str], capture_output: bool = False) CompletedProcess[bytes][source]

Run subprocess and return its state if completed with a success. If not, raise SubprocessNonZeroExitError.

Parameters
  • args (List[str]) – List of strings representing subprocess and its arguments

  • capture_output (bool) – Whether to capture output of subprocess.

Returns

State of the completed process

Return type

subprocess.CompletedProcess[bytes]

Raises

SubprocessNonZeroExitError – subprocess exited with non-zero exit code

data_pipelines_cli.config_generation module

class DbtProfile[source]

Bases: TypedDict

POD representing dbt’s profiles.yml file.

outputs: Dict[str, Dict[str, Any]]

Dictionary of a warehouse data and credentials, referenced by target name

target: str

Name of the target for dbt to run

copy_config_dir_to_build_dir() None[source]

Recursively copy config directory to build/dag/config working directory.

copy_dag_dir_to_build_dir() None[source]

Recursively copy dag directory to build/dag working directory.

generate_profiles_dict(env: str, copy_config_dir: bool) Dict[str, DbtProfile][source]

Generate and save profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Dictionary representing data to be saved in profiles.yml

Return type

Dict[str, DbtProfile]

generate_profiles_yml(env: str, copy_config_dir: bool = True) Path[source]

Generate and save profiles.yml file at build/profiles/local or build/profiles/env_execution, depending on env argument.

Parameters
  • env (str) – Name of the environment

  • copy_config_dir (bool) – Whether to copy config directory to build working directory

Returns

Path to build/profiles/{env}

Return type

pathlib.Path

get_profiles_dir_build_path(env: str) Path[source]

Returns path to build/profiles/<profile_name>/, depending on env argument.

Parameters

env (str) – Name of the environment

Returns

Return type

pathlib.Path

read_dictionary_from_config_directory(config_path: Union[str, PathLike[str]], env: str, file_name: str) Dict[str, Any][source]

Read dictionaries out of file_name in both base and env directories, and compile them into one. Values from env directory get precedence over base ones.

Parameters
  • config_path (Union[str, os.PathLike[str]]) – Path to the config directory

  • env (str) – Name of the environment

  • file_name (str) – Name of the YAML file to parse dictionary from

Returns

Compiled dictionary

Return type

Dict[str, Any]

data_pipelines_cli.data_structures module

class DataPipelinesConfig[source]

Bases: TypedDict

POD representing .dp.yml config file.

templates: Dict[str, TemplateConfig]

Dictionary of saved templates to use in dp create command

vars: Dict[str, str]

Variables to be passed to dbt as –vars argument

class DbtModel[source]

Bases: TypedDict

POD representing a single model from ‘schema.yml’ file.

columns: List[DbtTableColumn]
description: str
identifier: str
meta: Dict[str, Any]
name: str
tags: List[str]
tests: List[str]
class DbtSource[source]

Bases: TypedDict

POD representing a single source from ‘schema.yml’ file.

database: str
description: str
meta: Dict[str, Any]
name: str
schema: str
tables: List[DbtModel]
tags: List[str]
class DbtTableColumn[source]

Bases: TypedDict

POD representing a single column from ‘schema.yml’ file.

description: str
meta: Dict[str, Any]
name: str
quote: bool
tags: List[str]
tests: List[str]
class DockerArgs(env: str, image_tag: Optional[str], build_args: Dict[str, str])[source]

Bases: object

Arguments required by the Docker to make a push to the repository.

Raises

DataPipelinesErrorrepository variable not set or git hash not found

build_args: Dict[str, str]
docker_build_tag() str[source]

Prepare a tag for Docker Python API build command.

Returns

Tag for Docker Python API build command

Return type

str

image_tag: str

An image tag

repository: str

URI of the Docker images repository

class TemplateConfig[source]

Bases: TypedDict

POD representing value referenced in the templates section of the .dp.yml config file.

template_name: str

Name of the template

template_path: str

Local path or Git URI to the template repository

read_env_config() DataPipelinesConfig[source]

Parse .dp.yml config file, if it exists. Otherwise, raises NoConfigFileError.

Returns

POD representing .dp.yml config file, if it exists

Return type

DataPipelinesConfig

Raises

NoConfigFileError.dp.yml file not found

data_pipelines_cli.dbt_utils module

read_dbt_vars_from_configs(env: str) Dict[str, Any][source]

Read vars field from dp configuration file ($HOME/.dp.yml), base dbt.yml config (config/base/dbt.yml) and environment-specific config (config/{env}/dbt.yml) and compile into one dictionary.

Parameters

env (str) – Name of the environment

Returns

Dictionary with vars and their keys

Return type

Dict[str, Any]

run_dbt_command(command: Tuple[str, ...], env: str, profiles_path: Path, log_format_json: bool = False, capture_output: bool = False) CompletedProcess[bytes][source]

Run dbt subprocess in a context of specified env.

Parameters
  • command (Tuple[str, ...]) – Tuple representing dbt command and its optional arguments

  • env (str) – Name of the environment

  • profiles_path (pathlib.Path) – Path to the directory containing profiles.yml file

  • log_format_json (bool) – Whether to run dbt command with –log-format=json flag

  • capture_output (bool) – Whether to capture stdout of subprocess.

Returns

State of the completed process

Return type

subprocess.CompletedProcess[bytes]

Raises

data_pipelines_cli.docker_response_reader module

class DockerReadResponse(msg: str, is_error: bool)[source]

Bases: object

POD representing Docker response processed by DockerResponseReader.

is_error: bool

Whether response is error or not

msg: str

Read and processed message

class DockerResponseReader(logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]])[source]

Bases: object

Read and process Docker response.

Docker response turns into processed strings instead of plain dictionaries.

cached_read_response: Optional[List[DockerReadResponse]]

Internal cache of already processed response

click_echo_ok_responses() None[source]

Read, process and print positive Docker updates.

Raises

DockerErrorResponseError – Came across error update in Docker response.

logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]]

Iterable representing Docker response

read_response() List[DockerReadResponse][source]

Read and process Docker response.

Returns

List of processed lines of response

Return type

List[DockerReadResponse]

data_pipelines_cli.errors module

exception AirflowDagsPathKeyError[source]

Bases: DataPipelinesError

Exception raised if there is no dags_path in airflow.yml file.

exception DataPipelinesError(message: str, submessage: Optional[str] = None)[source]

Bases: Exception

Base class for all exceptions in data_pipelines_cli module

message: str

explanation of the error

submessage: Optional[str]

additional informations for the error

exception DependencyNotInstalledError(program_name: str)[source]

Bases: DataPipelinesError

Exception raised if certain dependency is not installed

exception DockerErrorResponseError(error_msg: str)[source]

Bases: DataPipelinesError

Exception raised if there is an error response from Docker client.

exception DockerNotInstalledError[source]

Bases: DependencyNotInstalledError

Exception raised if ‘docker’ is not installed

exception JinjaVarKeyError(key: str)[source]

Bases: DataPipelinesError

exception NoConfigFileError[source]

Bases: DataPipelinesError

Exception raised if .dp.yml does not exist

exception NotAProjectDirectoryError(project_path: str)[source]

Bases: DataPipelinesError

Exception raised if .copier-answers.yml file does not exist in given dir

exception NotSuppertedBIError[source]

Bases: DataPipelinesError

Exception raised if there is no target_id in bi.yml

exception SubprocessNonZeroExitError(subprocess_name: str, exit_code: int, subprocess_output: Optional[str] = None)[source]

Bases: DataPipelinesError

Exception raised if subprocess exits with non-zero exit code

exception SubprocessNotFound(subprocess_name: str)[source]

Bases: DataPipelinesError

Exception raised if subprocess cannot be found

data_pipelines_cli.filesystem_utils module

class LocalRemoteSync(local_path: Union[str, PathLike[str]], remote_path: str, remote_kwargs: Dict[str, str])[source]

Bases: object

Synchronizes local directory with a cloud storage’s one.

local_fs: AbstractFileSystem

FS representing local directory

local_path_str: str

Path to local directory

remote_path_str: str

Path/URI of the cloud storage directory

sync(delete: bool = True) None[source]

Send local files to the remote directory and (optionally) delete unnecessary ones.

Parameters

delete (bool) – Whether to delete remote files that are no longer present in local directory

data_pipelines_cli.io_utils module

git_revision_hash() Optional[str][source]

Get current Git revision hash, if Git is installed and any revision exists.

Returns

Git revision hash, if possible.

Return type

Optional[str]

replace(filename: Union[str, PathLike[str]], pattern: str, replacement: str) None[source]

Perform the pure-Python equivalent of in-place sed substitution: e.g., sed -i -e 's/'${pattern}'/'${replacement}' "${filename}".

Beware however, it uses Python regex dialect instead of sed’s one. It can introduce regex-related bugs.

data_pipelines_cli.jinja module

replace_vars_with_values(templated_dictionary: Dict[str, Any], dbt_vars: Dict[str, Any]) Dict[str, Any][source]

Replace variables in given dictionary using Jinja template in its values.

Parameters
  • templated_dictionary (Dict[str, Any]) – Dictionary with Jinja-templated values

  • dbt_vars (Dict[str, Any]) – Variables to replace

Returns

Dictionary with replaced variables

Return type

Dict[str, Any]

Raises

JinjaVarKeyError – Variable referenced in Jinja template does not exist

data_pipelines_cli.looker_utils module

deploy_lookML_model(key_path: str, env: str) None[source]

Write compiled lookML to Looker’s repository and deploy project to production

Parameters
  • key_path (str) – Path to the key with write access to git repository

  • env (str) – Name of the environment

generate_lookML_model() None[source]

Generate lookML codes based on compiled dbt project.

read_looker_config(env: str) Dict[str, Any][source]

Read Looker configuration.

Parameters

env (str) – Name of the environment

Returns

Compiled dictionary

Return type

Dict[str, Any]

data_pipelines_cli.vcs_utils module

Utilities related to VCS.

add_suffix_to_git_template_path(template_path: str) str[source]

Add .git suffix to template_path, if necessary.

Check if template_path starts with Git-specific prefix (e.g. git://), or http:// or https:// protocol. If so, then add .git suffix if not present. Does nothing otherwise (as template_path probably points to a local directory).

Parameters

template_path (str) – Path or URI to Git-based repository

Returns

template_path with .git as suffix, if necessary

Return type

str