import io
import json
from typing import Any, Dict, Optional, cast
import click
import yaml
from ..cli_constants import BUILD_DIR
from ..cli_utils import echo_error, echo_info, subprocess_run
from ..config_generation import read_dictionary_from_config_directory
from ..data_structures import DockerArgs
from ..docker_response_reader import DockerResponseReader
from ..errors import (
AirflowDagsPathKeyError,
DataPipelinesError,
DependencyNotInstalledError,
DockerErrorResponseError,
DockerNotInstalledError,
)
from ..filesystem_utils import LocalRemoteSync
[docs]class DeployCommand:
"""A class used to push and deploy the project to the remote machine."""
docker_args: Optional[DockerArgs]
"""Arguments required by the Docker to make a push to the repository.
If set to `None`, :meth:`deploy` will not make a push"""
datahub_ingest: bool
"""Whether to ingest DataHub metadata"""
blob_address_path: str
"""URI of the cloud storage to send build artifacts to"""
provider_kwargs_dict: Dict[str, Any]
"""Dictionary of arguments required by a specific cloud storage provider,
e.g. path to a token, username, password, etc."""
def __init__(
self,
env: str,
docker_push: bool,
dags_path: Optional[str],
provider_kwargs_dict: Optional[Dict[str, Any]],
datahub_ingest: bool,
) -> None:
self.docker_args = DockerArgs(env) if docker_push else None
self.datahub_ingest = datahub_ingest
self.provider_kwargs_dict = provider_kwargs_dict or {}
try:
self.blob_address_path = dags_path or read_dictionary_from_config_directory(
BUILD_DIR.joinpath("dag"),
env,
"airflow.yml",
)["dags_path"]
except KeyError as key_error:
raise AirflowDagsPathKeyError from key_error
[docs] def deploy(self) -> None:
"""Push and deploy the project to the remote machine.
:raises DependencyNotInstalledError: DataHub or Docker not installed
:raises DataPipelinesError: Error while pushing Docker image
"""
if self.docker_args:
self._docker_push()
if self.datahub_ingest:
self._datahub_ingest()
self._sync_bucket()
def _docker_push(self) -> None:
"""
:raises DockerNotInstalledError: Docker not installed
:raises DataPipelinesError: Error while pushing Docker image
"""
try:
import docker
except ModuleNotFoundError:
raise DockerNotInstalledError()
echo_info("Pushing Docker image")
docker_client = docker.from_env()
docker_args = cast(DockerArgs, self.docker_args)
try:
DockerResponseReader(
docker_client.images.push(
repository=docker_args.repository,
tag=docker_args.commit_sha,
stream=True,
decode=True,
)
).click_echo_ok_responses()
except DockerErrorResponseError as err:
echo_error(err.message)
raise DataPipelinesError(
"Error raised when pushing Docker image. Ensure that "
"Docker image you try to push exists. Maybe try running "
"'dp compile' first?"
)
@staticmethod
def _datahub_ingest() -> None:
""":raises DependencyNotInstalledError: DataHub not installed"""
try:
import datahub # noqa: F401
except ModuleNotFoundError:
raise DependencyNotInstalledError("datahub")
echo_info("Ingesting datahub metadata")
subprocess_run(
[
"datahub",
"ingest",
"-c",
str(BUILD_DIR.joinpath("dag", "config", "base", "datahub.yml")),
]
)
def _sync_bucket(self) -> None:
echo_info("Syncing Bucket")
LocalRemoteSync(
BUILD_DIR.joinpath("dag"), self.blob_address_path, self.provider_kwargs_dict
).sync(delete=True)
@click.command(
name="deploy",
help="Push and deploy the project to the remote machine",
)
@click.option(
"--env", default="base", show_default=True, type=str, help="Name of the environment"
)
@click.option("--dags-path", required=False, help="Remote storage URI")
@click.option(
"--blob-args",
required=False,
type=click.File("r"),
help="Path to JSON or YAML file with arguments that should be passed to "
"your Bucket/blob provider",
)
@click.option(
"--docker-push",
type=bool,
is_flag=True,
default=False,
help="Whether to push image to the Docker repository",
)
@click.option(
"--datahub-ingest",
is_flag=True,
default=False,
help="Whether to ingest DataHub metadata",
)
def deploy_command(
env: str,
dags_path: Optional[str],
blob_args: Optional[io.TextIOWrapper],
docker_push: bool,
datahub_ingest: bool,
) -> None:
if blob_args:
try:
provider_kwargs_dict = json.load(blob_args)
except json.JSONDecodeError:
blob_args.seek(0)
provider_kwargs_dict = yaml.safe_load(blob_args)
else:
provider_kwargs_dict = None
DeployCommand(
env,
docker_push,
dags_path,
provider_kwargs_dict,
datahub_ingest,
).deploy()