Hello, we are using kedro-mlflow (which is great f...
# plugins-integrations
l
Hello, we are using kedro-mlflow (which is great for logging model, artifacts, metrics, parameters when we do kedro run in local) but we are also using kedro-airflow where we run each node in a DockerOperator. With kedro-airflow, each DAG step is executed and creates a new run ID (i.e. we have one run for the model training node, one for model evaluation etc.) : the pipeline is totally fragmented. This is a real issue and we would like to have everything in one run (even if multi-step DAG) How can we achieve this? Thank you very much
A 1
m
Take a look at this example: https://github.com/getindata/kedro-airflow-gke-example/blob/main/templates/gke_operator.pytpl From https://medium.com/@getindatatechteam/deploying-efficient-kedro-pipelines-on-gcp-composer-airflow-with-node-grouping-mlflow-a45e68d9f42f General idea is: 1. Initialize the mlflow run, save run id as an initial, separate step 2. Inject mlflow run id into all other Kedro nodes (so they will log into the same run id) You can e.g. use Airflow's Xcom for that as in the linked example
👍 1
l
Thanks @marrrcin for the resource: after a careful read, the env variable given via airflow xcoms is the mlflow run name (not the ID). Hence there will still be 1 task = 1 run each time a dockerOperator is used. They will just all have the same name. This does not unite all the DAG tasks in airflow under one unique mlflow run id (allowing to have all logged elements under one run, as is intended by mlflow) In the blog, there is no screenshot of their MLflow runs, but I do believe they would have the same number as the number of DAG tasks in spaceflight_grouped Google Cloud screenshot. The only way to have a unique run is to have a unique task in airflow, that defeats the docker containerization of the DockerOperator. And multiple runs for each task defeats the purpose of mlflow for monitoring runs. Do you think it's feasible to have an mlflow_init step where: 1. we execute a cli command / python script launching a run and then querying it for its ID 2. we make it an env variable and passing it via xcom to the conf of all subsequent tasks Thanks for your input 😊
👎 1
m
Read the whole thing and the code carefully, it does exactly the thing I've said it does. Everything in the DAG will be logged under the same mlflow run id. https://github.com/getindata/kedro-airflow-gke-example/blob/e301ed5e75a6c770b4a9bbcd472f89088dff468f/templates/gke_operator.pytpl#L137
1
l
thanks for highlighting it 😊 will give it a second read 👍
a
thanks for linking the materials @marrrcin 🙂
😎 2
l
Nice to know the author is in the slack channel @Artur Dobrogowski 😊 we're figuring things out atm 🚀
I was wondering @Artur Dobrogowski, with the kedro logs for task runs, how did you manage to have the function return of the run id as your last stdout for xcom? We have implemented something similar to your project using DockerOperator but our limitation is that the run id is not the last element of stdout being picked up by xcom. We are trying to navigate this with xcom_all but have not yet solved it. Thanks a bunch for your insight 😊🙏
a
as far as I remember xcom exports are specific to the operator that you're using. In the case of kubernetes pod operator this mechanism was slow with extra sidecar container that was taking output of the pod as an output to push to xcom. See https://github.com/getindata/kedro-airflow-gke-example/blob/main/templates/gke_operator.pytpl#L107 I did it this way to avoid including airflow API in any way to keep the code independent from airflow. However if that was not an option I'd have to use airflow api to talk to it and use xcom there. https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html Looking at this documentation, I see it's more complicated with DockerOperator, as it pushes all that's in stdout, maybe a good idea is to redirect all output to other stream like stderr and keep only what you want to return in stdout as xcom output? Another option is to use another mechanism for sharing values like mounted files. Maybe
private_environment
could be used for that purpose in dockerOperator?
l
that explains the difference: thanks a bunch for the ressources 😊 we're going to explore it and report back here once we have something working 😉
a
another solution that would be fool proof is to include code that starts a run in mlflow and gets its run id to pass in python code defining it in the dag code and using pythonOperator and airflow api, then you still isolate airflow from your code but need to handle mlflow in airflow's dependencies - either using virtualenv or including it for airflow. I wanted to include that solution as it was universal, but didn't get down to it.
l
We solved this by having : step 1: a Docker Operator creating the mlflow run id and pushing to xcom all the std output (this step follows conventions of our other Docker Operator and has the env with credentials to access the mlflow server) step 2: PythonOperator to parse the stdout from step 1 and isolate the mlflow run id, then pushing it to xcom as a clean MLFLOW_RUN_ID variable step 3: other Docker Operator steps with our complex code, which pulls the MLFLOW_RUN_ID from step 2 😊 our end-goal is an open-source template. Will share it here once we have it thank you for all your inputs!
K 1
👍 1