Can I create a pipeline with a hook or does this a...
# questions
j
Can I create a pipeline with a hook or does this always need to be done via the registry?
d
so anything is possible, but we recommend that dynamic pipelines are avoided if you can. They introduce a lot of complexity that is hard to maintian.
j
If I were to do it, how would I do it? I have an abstract pipeline that I want to use to process many datasets. I’ve already dynamically created the datasets, so just need to create instances of the abstract pipeline that uses them.
d
so you would do it with an
after_context_created
hook
or you would do it in the registry itself
of you have an abstract pipeline make sure you using the modular pipeline and namespace concept
j
Oh, interesting. Here’s my hooks at the moment if it makes clear what I’m trying to do:
Copy code
import re
import logging
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.hooks import hook_impl
from <http://kedro.io|kedro.io> import PartitionedDataSet
from sunspots.extras.datasets.sunpy_datasets import SunPyMapDataSet
from kedro.extras.datasets.pickle.pickle_dataset import PickleDataSet


class ProjectHooks:
    @property
    def _logger(self):
        return logging.getLogger(__name__)

    @hook_impl
    def after_context_created(self, context):
        self.project_path = context.project_path
        <http://self._logger.info|self._logger.info>(f"Project path: {self.project_path}")

    def _get_credentials(self, key):
        conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
        conf_loader = ConfigLoader(conf_source=conf_path, env="local")
        return conf_loader.get("credentials*")[key]

    def add_sunpy_dataset(self, name, folder, layer=None):
        self.catalog.add(
            data_set_name=name,
            data_set=PartitionedDataSet(
                path=f"<s3://sunspots/data/{folder}/{name}>",
                dataset=SunPyMapDataSet,
                filename_suffix=".fits",
                credentials=self._get_credentials("dev_s3"),
                overwrite=True,
            ),
            replace=True,
        )
        if layer:
            self.catalog.layers[layer].add(name)
        <http://self._logger.info|self._logger.info>(f"Added dataset '{name}' to the data catalog.")

    def add_pickle_dataset(self, name, folder, layer=None):
        self.catalog.add(
            data_set_name=name,
            data_set=PickleDataSet(
                filepath=f"<s3://sunspots/data/{folder}/{name}>",
                credentials=self._get_credentials("dev_s3"),
            ),
            replace=True,
        )
        if layer:
            self.catalog.layers[layer].add(name)
        <http://self._logger.info|self._logger.info>(f"Added dataset '{name}' to the data catalog.")

    @hook_impl
    def after_catalog_created(self, catalog):
        self.catalog = catalog
        datasets = self.catalog.load("params:datasets")
        for dataset in datasets:
            start_date = re.findall(r"(\d+)", dataset)[0]
            self.add_sunpy_dataset(name=dataset, folder="01_raw", layer="Raw")
            self.add_pickle_dataset(
                name=f"{start_date}_targets",
                folder="07_model_output",
                layer="Model Output",
            )
            self.add_sunpy_dataset(
                name=f"{start_date}_region_submaps",
                folder="07_model_output",
                layer="Model Output",
            )
d
yeah it will work
K 1
j
Can I load parameters to the registry? Currently I’m doing this:
Copy code
def register_pipelines() -> Dict[str, Pipeline]:
    """
    Register the project's pipelines.

    Returns:
        A mapping from a pipeline name to a ``Pipeline`` object.
    """
    training_pipeline = tp.create_pipeline()
    execution_pipeline = ep.create_pipeline()
    pipelines = {
        "__default__": training_pipeline,
        "training_pipeline": training_pipeline,
        "execution_pipeline": execution_pipeline,
    }

    datasets = [
        "20140130Timeseries",
        "20140212Timeseries",
        "20140413Timeseries",
        "20140509Timeseries",
        "20140704Timeseries",
    ]

    execution_pipes = {}
    for dataset in datasets:
        start_date = re.findall("(\d+)", dataset)[0]
        execution_pipes[f"{start_date}_execution"] = pipeline(
            pipe=ep.create_pipeline(),
            inputs={"timeseries": dataset},
            outputs={
                "region_submaps": f"{start_date}_region_submaps",
                "targets": f"{start_date}_targets",
            },
        )
    return pipelines | execution_pipes
d
so yes parameters aren’t available in the registry, because it’s not supposed to be used like that 😂
j
I already have my dataset names listed as params, it’s a bit messy to have them re-listed in the registry
d
but what you’re doing is fine for your puproses
but it goes out of the scope of how we can best support you
👍 1
j
Lol, I knew I was going a bit off-piste with this strategy
@datajoely So if I were to do this by using an
after_context_created
hook, how do I actually access/append pipelines through the context? It’s not obvious to me from the docs. And apologies for the question barrage, I appreciate this use case is quite esoteric. If by chance there is an example somewhere it would be definitely be useful
d
it will have to be a mix of the registry and the hook I think
if you use a predictable naming convention it should be relatively manageable
👍 1
j
Great, I’ll have a go