Guillaume Tauzin
02/26/2025, 7:20 AMHall
02/26/2025, 7:20 AMdatajoely
02/26/2025, 8:53 AMclass PipelineGuardHooks:
def __init__(self) -> None:
self._logger = logging.getLogger(__name__)
@hook_impl
def before_pipeline_run(self, run_params: dict[str, str]) -> None:
"""Kill execution if someone is trying to run `prod` or `stg` outside of GitOps workflow"""
is_ci = os.environ.get("CI") == "true"
is_github_repo = os.environ.get("GITHUB_REPOSITORY") is not None
is_github_actions = is_ci and is_github_repo
kedro_env = run_params.get("env", os.environ.get("KEDRO_ENV"))
is_protected_environment = kedro_env in ["prod", "stg"]
if is_github_actions is False and is_protected_environment is False:
<http://self._logger.info|self._logger.info>(f'Running pipeline "{kedro_env}" on CI/CD')
else:
self._logger.error(
f'Cannot run pipeline for "{kedro_env}" outside of CI/CD. Killing process.'
)
sys.exit(1)
Guillaume Tauzin
02/26/2025, 5:04 PM