Source code for data_pipelines_cli.airbyte_utils

import ast
import copy
import json
import os
import pathlib
from typing import Any, Dict, Optional, Union

import requests
import yaml

from .cli_constants import BUILD_DIR
from .cli_utils import echo_error, echo_info


[docs]class AirbyteFactory: """A class used to create and update Airbyte connections defined in config yaml file""" airbyte_config_path: pathlib.Path """Path to config yaml file containing connections definitions""" auth_token: Optional[str] """Authorization OIDC ID token for a service account to communication with Airbyte instance""" def __init__(self, airbyte_config_path: pathlib.Path, auth_token: Optional[str]) -> None: self.airbyte_config_path = airbyte_config_path self.auth_token = auth_token with open(self.airbyte_config_path, "r") as airbyte_config_file: self.airbyte_config = yaml.safe_load(airbyte_config_file) self.airbyte_url = self.airbyte_config["airbyte_url"]
[docs] @staticmethod def find_config_file(env: str, config_name: str = "airbyte") -> pathlib.Path: if BUILD_DIR.joinpath("dag", "config", env, f"{config_name}.yml").is_file(): return BUILD_DIR.joinpath("dag", "config", env, f"{config_name}.yml") return BUILD_DIR.joinpath("dag", "config", "base", f"{config_name}.yml")
[docs] @staticmethod def env_replacer(config: Dict[str, Any]) -> Dict[str, Any]: return ast.literal_eval(os.path.expandvars(f"{config}"))
[docs] def create_update_connections(self) -> None: """Create and update Airbyte connections defined in config yaml file""" if self.airbyte_config["connections"]: [ self.create_update_connection(self.airbyte_config["connections"][connection]) for connection in self.airbyte_config["connections"] ] [task.update(self.env_replacer(task)) for task in self.airbyte_config["tasks"]] self.update_file(self.airbyte_config)
[docs] def create_update_connection(self, connection_config: Dict[str, Any]) -> Any: connection_config_copy = copy.deepcopy(connection_config) response_search = self.request_handler( "connections/search", { "sourceId": connection_config_copy["sourceId"], "destinationId": connection_config_copy["destinationId"], "namespaceDefinition": connection_config_copy["namespaceDefinition"], "namespaceFormat": connection_config_copy["namespaceFormat"], }, ) if not response_search["connections"]: echo_info(f"Creating connection config for {connection_config_copy['name']}") response_create = self.request_handler( "connections/create", connection_config_copy, ) os.environ[response_create["name"]] = response_create["connectionId"] else: echo_info(f"Updating connection config for {connection_config_copy['name']}") connection_config_copy.pop("sourceId", None) connection_config_copy.pop("destinationId", None) connection_config_copy["connectionId"] = response_search["connections"][0][ "connectionId" ] response_update = self.request_handler( "connections/update", connection_config_copy, ) os.environ[response_update["name"]] = response_update["connectionId"]
[docs] def update_file(self, updated_config: Dict[str, Any]) -> None: with open(self.airbyte_config_path, "w") as airbyte_config_file: yaml.safe_dump(updated_config, airbyte_config_file)
[docs] def request_handler(self, endpoint: str, config: Dict[str, Any]) -> Union[Dict[str, Any], Any]: url = f"{self.airbyte_url}/api/v1/{endpoint}" headers = { "Accept": "application/json", "Content-Type": "application/json", } if self.auth_token is not None: headers["Authorization"] = f"Bearer {self.auth_token}" try: response = requests.post(url=url, headers=headers, data=json.dumps(config)) response.raise_for_status() data = response.json() return data except requests.exceptions.HTTPError as e: echo_error(e.response.text) return None