Jordan
11/08/2022, 12:26 PMdatajoely
11/08/2022, 2:21 PMJordan
11/08/2022, 2:51 PMdatajoely
11/08/2022, 2:52 PMafter_context_created
hookJordan
11/08/2022, 2:54 PMimport re
import logging
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.hooks import hook_impl
from <http://kedro.io|kedro.io> import PartitionedDataSet
from sunspots.extras.datasets.sunpy_datasets import SunPyMapDataSet
from kedro.extras.datasets.pickle.pickle_dataset import PickleDataSet
class ProjectHooks:
@property
def _logger(self):
return logging.getLogger(__name__)
@hook_impl
def after_context_created(self, context):
self.project_path = context.project_path
<http://self._logger.info|self._logger.info>(f"Project path: {self.project_path}")
def _get_credentials(self, key):
conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
conf_loader = ConfigLoader(conf_source=conf_path, env="local")
return conf_loader.get("credentials*")[key]
def add_sunpy_dataset(self, name, folder, layer=None):
self.catalog.add(
data_set_name=name,
data_set=PartitionedDataSet(
path=f"<s3://sunspots/data/{folder}/{name}>",
dataset=SunPyMapDataSet,
filename_suffix=".fits",
credentials=self._get_credentials("dev_s3"),
overwrite=True,
),
replace=True,
)
if layer:
self.catalog.layers[layer].add(name)
<http://self._logger.info|self._logger.info>(f"Added dataset '{name}' to the data catalog.")
def add_pickle_dataset(self, name, folder, layer=None):
self.catalog.add(
data_set_name=name,
data_set=PickleDataSet(
filepath=f"<s3://sunspots/data/{folder}/{name}>",
credentials=self._get_credentials("dev_s3"),
),
replace=True,
)
if layer:
self.catalog.layers[layer].add(name)
<http://self._logger.info|self._logger.info>(f"Added dataset '{name}' to the data catalog.")
@hook_impl
def after_catalog_created(self, catalog):
self.catalog = catalog
datasets = self.catalog.load("params:datasets")
for dataset in datasets:
start_date = re.findall(r"(\d+)", dataset)[0]
self.add_sunpy_dataset(name=dataset, folder="01_raw", layer="Raw")
self.add_pickle_dataset(
name=f"{start_date}_targets",
folder="07_model_output",
layer="Model Output",
)
self.add_sunpy_dataset(
name=f"{start_date}_region_submaps",
folder="07_model_output",
layer="Model Output",
)
datajoely
11/08/2022, 2:55 PMJordan
11/08/2022, 2:57 PMdef register_pipelines() -> Dict[str, Pipeline]:
"""
Register the project's pipelines.
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
training_pipeline = tp.create_pipeline()
execution_pipeline = ep.create_pipeline()
pipelines = {
"__default__": training_pipeline,
"training_pipeline": training_pipeline,
"execution_pipeline": execution_pipeline,
}
datasets = [
"20140130Timeseries",
"20140212Timeseries",
"20140413Timeseries",
"20140509Timeseries",
"20140704Timeseries",
]
execution_pipes = {}
for dataset in datasets:
start_date = re.findall("(\d+)", dataset)[0]
execution_pipes[f"{start_date}_execution"] = pipeline(
pipe=ep.create_pipeline(),
inputs={"timeseries": dataset},
outputs={
"region_submaps": f"{start_date}_region_submaps",
"targets": f"{start_date}_targets",
},
)
return pipelines | execution_pipes
datajoely
11/08/2022, 2:57 PMJordan
11/08/2022, 2:57 PMdatajoely
11/08/2022, 2:57 PMJordan
11/08/2022, 2:58 PMafter_context_created
hook, how do I actually access/append pipelines through the context? It’s not obvious to me from the docs. And apologies for the question barrage, I appreciate this use case is quite esoteric. If by chance there is an example somewhere it would be definitely be usefuldatajoely
11/08/2022, 3:30 PMJordan
11/08/2022, 3:36 PM