Is it possible to import an already packaged Kedro...
# questions
t
Is it possible to import an already packaged Kedro pipeline in a separate script and assign node return values to new variables for use later in the script? I've been trying to get people on our team on board with Kedro and a couple of us would be really interested in being able to use
MemoryDataSet
returned by nodes as pieces of larger scripts. Up until now, I've only needed to import
main
and that has worked for our purposes so far
n
Kedro pipeline should return a dictionary of dataset and you can consume it.
t
So I've only been able to call
main()
in whatever script I put it in until this point which exits the script. Not sure what I'm doing wrong, but I saw old documentation and examples of others doing it but it was around ver=~17 and things like
load_context()
were still around (even though that case wouldn't be a package)
n
Do you still have control over that script? If it is not returning anything there isn't much you can do about it to change an existing program
t
Yeah what I've been experimenting with is a simple anomaly detection pipeline that I wrote up. Originally, I only needed csv dumps of dataframes and that's been working great. But putting it in a separate script, I want to avoid the file io, so I added a memorydataset return of the same dataframe. But I'm assuming I'm calling the packaged pipeline incorrectly because the print here won't even execute:
Copy code
from my_pipeline.__main__ import main
main()
print('here!')
I haven't addressed the issue of getting the return value from the pipeline
n
Do you have the definition of your main and can you post here? Also which version of kedro you are on? I think I roughly know what's happening and this is something I am eager to fix to make integrating kedro easier. https://github.com/kedro-org/kedro/pull/1423 may shows some light about what's going on, I will try to find more time to look at this tomorrow.
t
That issue is pretty much spot on I think. Something like that would be awesome. This pipeline was written using kedro 0.18.8 but I've since upgraded to using 0.18.9 with no issues
my
__main__.py
is the default main that is generated when you create a new kedro project with
kedro new
and hasn't been modified in any way. Unless it get's changed when you do a
kedro package
? I'll check though
Although, I don't necessarily need to run the pipeline in a script using
main()
. It looks like the session solution you mentioned in that github issue will solve my problem though? Just import
KedroSession
from
kedro.framework.session
and
configure_project
from
kedro.framework.project
?
Also a diff of my "post-packaged"
__main__.py
and the one generated when the project is created yields no differences:
__main.py__
:
Copy code
import importlib
from pathlib import Path

from kedro.framework.cli.utils import KedroCliError, load_entry_points
from kedro.framework.project import configure_project


def _find_run_command(package_name):
    try:
        project_cli = importlib.import_module(f"{package_name}.cli")
        # fail gracefully if cli.py does not exist
    except ModuleNotFoundError as exc:
        if f"{package_name}.cli" not in str(exc):
            raise
        plugins = load_entry_points("project")
        run = _find_run_command_in_plugins(plugins) if plugins else None
        if run:
            # use run command from installed plugin if it exists
            return run
        # use run command from `kedro.framework.cli.project`
        from kedro.framework.cli.project import run

        return run
    # fail badly if cli.py exists, but has no `cli` in it
    if not hasattr(project_cli, "cli"):
        raise KedroCliError(f"Cannot load commands from {package_name}.cli")
    return project_cli.run


def _find_run_command_in_plugins(plugins):
    for group in plugins:
        if "run" in group.commands:
            return group.commands["run"]


def main(*args, **kwargs):
    package_name = Path(__file__).parent.name
    configure_project(package_name)
    run = _find_run_command(package_name)
    run(*args, **kwargs)


if __name__ == "__main__":
    main()
n
I think for now you need to do the KedroSession way, which is similar to the Databrick’s workflow because Databricks doesn’t like
click
Copy code
from kedro.framework.project import configure_project
from kedro.framework.session import KedroSession

configure_project(package_name)
with KedroSession.create(env=env, conf_source=conf_source) as session:
        result = session.run()  # result is a dict of result that you are interested