Source code for data_pipelines_cli.errors

[docs]class DataPipelinesError(Exception): """Base class for all exceptions in data_pipelines_cli module""" message: str """explanation of the error""" def __init__(self, message: str) -> None: self.message = message
[docs]class DependencyNotInstalledError(DataPipelinesError): """Exception raised if certain dependency is not installed""" def __init__(self, program_name: str) -> None: self.message = ( f"'{program_name}' not installed. Run 'pip install " f"data-pipelines-cli[{program_name}]'" )
[docs]class NoConfigFileError(DataPipelinesError): """Exception raised if `.dp.yml` does not exist""" def __init__(self) -> None: self.message = "`.dp.yml` config file does not exists. Run 'dp init' to create it."
[docs]class NotAProjectDirectoryError(DataPipelinesError): """Exception raised if `.copier-answers.yml` file does not exist in given dir""" def __init__(self, project_path: str) -> None: self.message = ( f"Given path {project_path} is not a data-pipelines project directory." " Run 'dp create' first to create a project" )
[docs]class SubprocessNonZeroExitError(DataPipelinesError): """Exception raised if subprocess exits with non-zero exit code""" def __init__(self, subprocess_name: str, exit_code: int) -> None: self.message = f"{subprocess_name} has exited with non-zero exit code: {exit_code}"
[docs]class SubprocessNotFound(DataPipelinesError): """Exception raised if subprocess cannot be found""" def __init__(self, subprocess_name: str) -> None: self.message = ( f"{subprocess_name} cannot be found. " "Ensure it is installed and listed in your $PATH." )
[docs]class DockerNotInstalledError(DependencyNotInstalledError): """Exception raised if 'docker' is not installed""" def __init__(self) -> None: super().__init__("docker")
[docs]class JinjaVarKeyError(DataPipelinesError): def __init__(self, key: str) -> None: self.message = ( f"Variable '{key}' cannot be found neither in 'dbt.yml' and " "'$HOME/.dp.yml' vars nor in environment variables, causing Jinja " "template rendering to fail." )
[docs]class AirflowDagsPathKeyError(DataPipelinesError): """Exception raised if there is no ``dags_path`` in `airflow.yml` file.""" def __init__(self) -> None: self.message = "Variable 'dags_path' cannot be found in 'airflow.yml' config file."
[docs]class DockerErrorResponseError(DataPipelinesError): """Exception raised if there is an error response from Docker client.""" def __init__(self, error_msg: str) -> None: self.message = "Error raised when using Docker.\n" + error_msg