data_pipelines_cli package
data-pipelines-cli (dp) is a CLI tool designed for data platform.
dp helps data analysts to create, maintain and make full use of their data pipelines.
Subpackages
- data_pipelines_cli.cli_commands package
- Subpackages
- Submodules
- data_pipelines_cli.cli_commands.clean module
- data_pipelines_cli.cli_commands.compile module
- data_pipelines_cli.cli_commands.create module
- data_pipelines_cli.cli_commands.deploy module
- data_pipelines_cli.cli_commands.docs module
- data_pipelines_cli.cli_commands.init module
- data_pipelines_cli.cli_commands.prepare_env module
- data_pipelines_cli.cli_commands.publish module
- data_pipelines_cli.cli_commands.run module
- data_pipelines_cli.cli_commands.seed module
- data_pipelines_cli.cli_commands.template module
- data_pipelines_cli.cli_commands.test module
- data_pipelines_cli.cli_commands.update module
Submodules
data_pipelines_cli.airbyte_utils module
- class AirbyteFactory(airbyte_config_path: pathlib.Path, auth_token: Optional[str])[source]
Bases:
object
A class used to create and update Airbyte connections defined in config yaml file
- airbyte_config_path: pathlib.Path
Path to config yaml file containing connections definitions
- auth_token: Optional[str]
Authorization OIDC ID token for a service account to communication with Airbyte instance
data_pipelines_cli.bi_utils module
- class BiAction(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
enum.Enum
- COMPILE = 1
- DEPLOY = 2
- bi(env: str, bi_action: data_pipelines_cli.bi_utils.BiAction, key_path: Optional[str] = None) None [source]
Generate and deploy BI codes using dbt compiled data.
- Parameters
env (str) – Name of the environment
bi_action – Action to be run [COMPILE, DEPLOY]
key_path – Path to the key with write access to git repository
- Raises
NotSuppertedBIError – Not supported bi in bi.yml configuration
data_pipelines_cli.cli module
data_pipelines_cli.cli_configs module
data_pipelines_cli.cli_constants module
- DEFAULT_GLOBAL_CONFIG: data_pipelines_cli.data_structures.DataPipelinesConfig = {'templates': {}, 'vars': {}}
Content of the config file created by dp init command if no template path is provided
- IMAGE_TAG_TO_REPLACE: str = '<IMAGE_TAG>'
- PROFILE_NAME_ENV_EXECUTION = 'env_execution'
Name of the dbt target to use for a remote machine
- PROFILE_NAME_LOCAL_ENVIRONMENT = 'local'
Name of the environment and dbt target to use for a local machine
data_pipelines_cli.cli_utils module
- echo_error(text: str, **kwargs: Any) None [source]
Print an error message to stderr using click-specific print function.
- Parameters
text (str) – Message to print
kwargs –
- echo_info(text: str, **kwargs: Any) None [source]
Print a message to stdout using click-specific print function.
- Parameters
text (str) – Message to print
kwargs –
- echo_suberror(text: str, **kwargs: Any) None [source]
Print a suberror message to stderr using click-specific print function.
- Parameters
text (str) – Message to print
kwargs –
- echo_subinfo(text: str, **kwargs: Any) None [source]
Print a subinfo message to stdout using click-specific print function.
- Parameters
text (str) – Message to print
kwargs –
- echo_warning(text: str, **kwargs: Any) None [source]
Print a warning message to stderr using click-specific print function.
- Parameters
text (str) – Message to print
kwargs –
- get_argument_or_environment_variable(argument: Optional[str], argument_name: str, environment_variable_name: str) str [source]
Given argument is not
None
, return its value. Otherwise, search for environment_variable_name amongst environment variables and return it. If such a variable is not set, raiseDataPipelinesError
.- Parameters
argument (Optional[str]) – Optional value passed to the CLI as the argument_name
argument_name (str) – Name of the CLI’s argument
environment_variable_name (str) – Name of the environment variable to search for
- Returns
Value of the argument or specified environment variable
- Raises
DataPipelinesError – argument is
None
and environment_variable_name is not set
- subprocess_run(args: List[str], capture_output: bool = False) subprocess.CompletedProcess[bytes] [source]
Run subprocess and return its state if completed with a success. If not, raise
SubprocessNonZeroExitError
.- Parameters
args (List[str]) – List of strings representing subprocess and its arguments
capture_output (bool) – Whether to capture output of subprocess.
- Returns
State of the completed process
- Return type
subprocess.CompletedProcess[bytes]
- Raises
SubprocessNonZeroExitError – subprocess exited with non-zero exit code
data_pipelines_cli.config_generation module
- class DbtProfile[source]
Bases:
TypedDict
POD representing dbt’s profiles.yml file.
- outputs: Dict[str, Dict[str, Any]]
Dictionary of a warehouse data and credentials, referenced by target name
- target: str
Name of the target for dbt to run
- copy_config_dir_to_build_dir() None [source]
Recursively copy config directory to build/dag/config working directory.
- copy_dag_dir_to_build_dir() None [source]
Recursively copy dag directory to build/dag working directory.
- generate_profiles_dict(env: str, copy_config_dir: bool) Dict[str, data_pipelines_cli.config_generation.DbtProfile] [source]
Generate and save
profiles.yml
file atbuild/profiles/local
orbuild/profiles/env_execution
, depending on env argument.- Parameters
env (str) – Name of the environment
copy_config_dir (bool) – Whether to copy
config
directory tobuild
working directory
- Returns
Dictionary representing data to be saved in
profiles.yml
- Return type
Dict[str, DbtProfile]
- generate_profiles_yml(env: str, copy_config_dir: bool = True) pathlib.Path [source]
Generate and save
profiles.yml
file atbuild/profiles/local
orbuild/profiles/env_execution
, depending on env argument.- Parameters
env (str) – Name of the environment
copy_config_dir (bool) – Whether to copy
config
directory tobuild
working directory
- Returns
Path to
build/profiles/{env}
- Return type
pathlib.Path
- get_profiles_dir_build_path(env: str) pathlib.Path [source]
Returns path to
build/profiles/<profile_name>/
, depending on env argument.- Parameters
env (str) – Name of the environment
- Returns
- Return type
pathlib.Path
- read_dictionary_from_config_directory(config_path: Union[str, os.PathLike[str]], env: str, file_name: str) Dict[str, Any] [source]
Read dictionaries out of file_name in both base and env directories, and compile them into one. Values from env directory get precedence over base ones.
- Parameters
config_path (Union[str, os.PathLike[str]]) – Path to the config directory
env (str) – Name of the environment
file_name (str) – Name of the YAML file to parse dictionary from
- Returns
Compiled dictionary
- Return type
Dict[str, Any]
data_pipelines_cli.data_structures module
- class DataPipelinesConfig[source]
Bases:
TypedDict
POD representing .dp.yml config file.
- templates: Dict[str, data_pipelines_cli.data_structures.TemplateConfig]
Dictionary of saved templates to use in dp create command
- vars: Dict[str, str]
Variables to be passed to dbt as –vars argument
- class DbtModel[source]
Bases:
TypedDict
POD representing a single model from ‘schema.yml’ file.
- columns: List[data_pipelines_cli.data_structures.DbtTableColumn]
- description: str
- identifier: str
- meta: Dict[str, Any]
- name: str
- tags: List[str]
- tests: List[str]
- class DbtSource[source]
Bases:
TypedDict
POD representing a single source from ‘schema.yml’ file.
- database: str
- description: str
- meta: Dict[str, Any]
- name: str
- schema: str
- tables: List[data_pipelines_cli.data_structures.DbtModel]
- tags: List[str]
- class DbtTableColumn[source]
Bases:
TypedDict
POD representing a single column from ‘schema.yml’ file.
- description: str
- meta: Dict[str, Any]
- name: str
- quote: bool
- tags: List[str]
- tests: List[str]
- class DockerArgs(env: str, image_tag: Optional[str], build_args: Dict[str, str])[source]
Bases:
object
Arguments required by the Docker to make a push to the repository.
- Raises
DataPipelinesError – repository variable not set or git hash not found
- build_args: Dict[str, str]
- docker_build_tag() str [source]
Prepare a tag for Docker Python API build command.
- Returns
Tag for Docker Python API build command
- Return type
str
- image_tag: str
An image tag
- repository: str
URI of the Docker images repository
- class TemplateConfig[source]
Bases:
TypedDict
POD representing value referenced in the templates section of the .dp.yml config file.
- template_name: str
Name of the template
- template_path: str
Local path or Git URI to the template repository
- read_env_config() data_pipelines_cli.data_structures.DataPipelinesConfig [source]
Parse .dp.yml config file, if it exists. Otherwise, raises
NoConfigFileError
.- Returns
POD representing .dp.yml config file, if it exists
- Return type
- Raises
NoConfigFileError – .dp.yml file not found
data_pipelines_cli.dbt_utils module
- read_dbt_vars_from_configs(env: str) Dict[str, Any] [source]
Read vars field from dp configuration file (
$HOME/.dp.yml
), basedbt.yml
config (config/base/dbt.yml
) and environment-specific config (config/{env}/dbt.yml
) and compile into one dictionary.- Parameters
env (str) – Name of the environment
- Returns
Dictionary with vars and their keys
- Return type
Dict[str, Any]
- run_dbt_command(command: Tuple[str, ...], env: str, profiles_path: pathlib.Path, log_format_json: bool = False, capture_output: bool = False) subprocess.CompletedProcess[bytes] [source]
Run dbt subprocess in a context of specified env.
- Parameters
command (Tuple[str, ...]) – Tuple representing dbt command and its optional arguments
env (str) – Name of the environment
profiles_path (pathlib.Path) – Path to the directory containing profiles.yml file
log_format_json (bool) – Whether to run dbt command with –log-format=json flag
capture_output (bool) – Whether to capture stdout of subprocess.
- Returns
State of the completed process
- Return type
subprocess.CompletedProcess[bytes]
- Raises
SubprocessNotFound – dbt not installed
SubprocessNonZeroExitError – dbt exited with error
data_pipelines_cli.docker_response_reader module
- class DockerReadResponse(msg: str, is_error: bool)[source]
Bases:
object
POD representing Docker response processed by
DockerResponseReader
.- is_error: bool
Whether response is error or not
- msg: str
Read and processed message
- class DockerResponseReader(logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]])[source]
Bases:
object
Read and process Docker response.
Docker response turns into processed strings instead of plain dictionaries.
- cached_read_response: Optional[List[data_pipelines_cli.docker_response_reader.DockerReadResponse]]
Internal cache of already processed response
- click_echo_ok_responses() None [source]
Read, process and print positive Docker updates.
- Raises
DockerErrorResponseError – Came across error update in Docker response.
- logs_generator: Iterable[Union[str, Dict[str, Union[str, Dict[str, str]]]]]
Iterable representing Docker response
- read_response() List[data_pipelines_cli.docker_response_reader.DockerReadResponse] [source]
Read and process Docker response.
- Returns
List of processed lines of response
- Return type
List[DockerReadResponse]
data_pipelines_cli.errors module
- exception AirflowDagsPathKeyError[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if there is no
dags_path
in airflow.yml file.
- exception DataPipelinesError(message: str, submessage: Optional[str] = None)[source]
Bases:
Exception
Base class for all exceptions in data_pipelines_cli module
- message: str
explanation of the error
- submessage: Optional[str]
additional informations for the error
- exception DependencyNotInstalledError(program_name: str)[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if certain dependency is not installed
- exception DockerErrorResponseError(error_msg: str)[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if there is an error response from Docker client.
- exception DockerNotInstalledError[source]
Bases:
data_pipelines_cli.errors.DependencyNotInstalledError
Exception raised if ‘docker’ is not installed
- exception NoConfigFileError[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if .dp.yml does not exist
- exception NotAProjectDirectoryError(project_path: str)[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if .copier-answers.yml file does not exist in given dir
- exception NotSuppertedBIError[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if there is no
target_id
in bi.yml
- exception SubprocessNonZeroExitError(subprocess_name: str, exit_code: int, subprocess_output: Optional[str] = None)[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if subprocess exits with non-zero exit code
- exception SubprocessNotFound(subprocess_name: str)[source]
Bases:
data_pipelines_cli.errors.DataPipelinesError
Exception raised if subprocess cannot be found
data_pipelines_cli.filesystem_utils module
data_pipelines_cli.io_utils module
- git_revision_hash() Optional[str] [source]
Get current Git revision hash, if Git is installed and any revision exists.
- Returns
Git revision hash, if possible.
- Return type
Optional[str]
- replace(filename: Union[str, os.PathLike[str]], pattern: str, replacement: str) None [source]
Perform the pure-Python equivalent of in-place sed substitution: e.g.,
sed -i -e 's/'${pattern}'/'${replacement}' "${filename}"
.Beware however, it uses Python regex dialect instead of sed’s one. It can introduce regex-related bugs.
data_pipelines_cli.jinja module
- replace_vars_with_values(templated_dictionary: Dict[str, Any], dbt_vars: Dict[str, Any]) Dict[str, Any] [source]
Replace variables in given dictionary using Jinja template in its values.
- Parameters
templated_dictionary (Dict[str, Any]) – Dictionary with Jinja-templated values
dbt_vars (Dict[str, Any]) – Variables to replace
- Returns
Dictionary with replaced variables
- Return type
Dict[str, Any]
- Raises
JinjaVarKeyError – Variable referenced in Jinja template does not exist