Hey y'all. A tough one, for me at least: say my da...
# questions
r
Hey y'all. A tough one, for me at least: say my data is a monthly time series. I want to train one model per month. I can do it easily with a for loop, but that won't allow me to run in parallel. Is there a kedro-esque way to do this using maybe modular pipelines? I think I know how to do it if there was a fixed number of months, but that is not the case.
i
You can parallelize the for loop with something like joblib If you wanted to use kedro to parallelize (with
ParallelRunner
) you’d probably have to use some black magic which wouldn’t be too nice Maybe this is helpful: https://stackoverflow.com/questions/29589327/train-multiple-models-in-parallel-with-sklearn
r
I'd rather use kedro itself. I am already doing paralell training of different models, but on same data. The difficulty is training each model with different data (a grid of parallel training).
I don't think this is something supported at all. I could load the data inside the pipeline and create many different pipelines in there, but it breaks completely Kedro's semantics.
Thanks for the pointers though, @Ian Whalen!
d
The pipeline structure needs to be determined before you run it. So, if you want to run something based on the structure of the data, you'll need something that reads the data (e.g. to determine the set of months in some file) and uses that to dynamically construct a Kedro pipeline, and then execute it.
r
Yep, results don't look good, maintenance becomes a pain.
Maybe that's a nice feature if someone can fully specify. Basically there would have to be a way for pipelines to be dynamically instantiated based on the output of a node. Say, we have a pipeline
consumer
that gets a single number and do something with it. We also have
generator
which generates a list of numbers of unknown size. It should be possible to connect these two by saying that
generator
creates a sequence of outputs that
consumer
can use, and Kedro under the hood would take care of this by instantiating multiple
consumer
pipelines, each being passed an element provided by
generator
.
d
Do you know of other pipelining frameworks by chance that provide this kind of behavior? I know lot of orchestrators allow loops, conditional branches, etc., but not familiar with dynamic generation of pipelines partway through.
r
I'm not familiar with any other framework that provides this.
As a fun note, I asked ChatGPT how to solve this and it hallucinated a
kedro.runner.TaskRunner
that kinda would do the trick. It allowed for a pipeline name to be passed and instantiated inside a node with remapped inputs.
This would solve the problem of not being able to import a pipeline inside the nodes module because of circular references.
But now I wonder if it would be possible to import a separate pipeline module altogether. hm...
Tried that, turns out that importing a separate pipeline module inside a node from another pipeline breaks something and Kedro is unable to figure out the graph.
That would have done it, I think -- I'd instantiate many modular pipelines inside a node.
I'm thinking a solution would be to read the dataset in the catalog from the pipeline definition, so that I can instantiate as many pipelines as the data needs. Is that possible at all? @Deepyaman Datta?
d
That's similar to what I said in https://kedro-org.slack.com/archives/C03RKP2LW64/p1678489685247699?thread_ts=1678476079.170489&cid=C03RKP2LW64. However, as mentioned there, you need to know the data before you construct the pipeline.
r
Yes. But how to read the data in the pipeline.py file?
For this to be minimally organized, data would have to come from the catalog.
I'm trying to use KedroSession, but so far no luck.
d
What you're saying should work. What problem are you running into?
r
kedro.framework.session.get_current_session
seems to have been deprecated
In 1.18
Not sure how to load the current session
d
I'll look into it tomorrow morning if I have some time, else I'll loop somebody in. I don't know off the top of my head without trying it quickly.
r
I think I managed to do it!
Copy code
from kedro.framework.project import configure_project
from kedro.framework.session import KedroSession
    with KedroSession.create(package_name="<project_name>") as session:
        context = session.load_context()
        catalog = context.catalog
🙌 1
Then
catalog.load('data')
will work.
d
Ah, I was literally about to post this:
Copy code
with KedroSession.create() as session:
     context = session.load_context()
     catalog = context.catalog
So seems we wrote pretty much the exact same thing independently 🙂
r
Yes, lol Thanks for the help! Still a bit hacky, but will do for the moment.
d
(I don't think you need to pass
package_name
or import
configure_project
, or at least I didn't)
r
That makes sense. I don't think it is needed.
I still have to register catalog items programmatically as well, otherwise it won't store the output of those many pipelines.
But I think that can be done using catalog api
👍 1
d
Yeah. Sometimes I think
PartitionedDataSet
can be a good way to be able to define once in a structure for each dataset, but that requires writing the pipeline in a different way and has it's own limitations, so I think what you say is good for now.
FWIW I noticed if I ran into an error in that catalog loading piece, Kedro swallowed it:
Copy code
(diamonds) deepyaman@Deepyamans-MacBook-Air spaceflights % kedro run                                             
[03/13/23 07:29:03] INFO     Kedro project spaceflights                                                                                                                                           session.py:340
[03/13/23 07:29:04] WARNING  /opt/miniconda3/envs/diamonds/lib/python3.10/site-packages/kedro/framework/project/__init__.py:359: UserWarning: An error occurred while importing the              warnings.py:109
                             'spaceflights.pipelines.data_science' module. Nothing defined therein will be returned by 'find_pipelines'.                                                                        
                                                                                                                                                                                                                
                             Traceback (most recent call last):                                                                                                                                                 
                               File "/opt/miniconda3/envs/diamonds/lib/python3.10/site-packages/kedro/framework/project/__init__.py", line 357, in find_pipelines                                               
                                 pipeline_module = importlib.import_module(pipeline_module_name)                                                                                                                
                               File "/opt/miniconda3/envs/diamonds/lib/python3.10/importlib/__init__.py", line 126, in import_module                                                                            
                                 return _bootstrap._gcd_import(name[level:], package, level)                                                                                                                    
                               File "<frozen importlib._bootstrap>", line 1050, in _gcd_import                                                                                                                  
                               File "<frozen importlib._bootstrap>", line 1027, in _find_and_load                                                                                                               
                               File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked                                                                                                      
                               File "<frozen importlib._bootstrap>", line 688, in _load_unlocked                                                                                                                
                               File "<frozen importlib._bootstrap_external>", line 883, in exec_module                                                                                                          
                               File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed                                                                                                     
                               File "/Users/deepyaman/github/kedro-org/kedro/spaceflights/src/spaceflights/pipelines/data_science/__init__.py", line 3, in <module>                                             
                                 from .pipeline import create_pipeline  # NOQA                                                                                                                                  
                               File "/Users/deepyaman/github/kedro-org/kedro/spaceflights/src/spaceflights/pipelines/data_science/pipeline.py", line 10, in <module>                                            
                                 context = session.context                                                                                                                                                      
                             AttributeError: 'KedroSession' object has no attribute 'context'                                                                                                                   
                                                                                                                                                                                                                
                               warnings.warn(                                                                                                                                                                   
                                                                                                                                                                                                                
                    INFO     Loading data from 'companies' (CSVDataSet)...                                                                                                                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node: preprocess_companies([companies]) -> [preprocessed_companies]                                                                  node.py:327
[03/13/23 07:29:05] INFO     Saving data to 'preprocessed_companies' (ParquetDataSet)...                                                                                                     data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                                                                                                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                                                                                                                  data_catalog.py:34
So maybe just something to be aware of... I can also raise an issue on this later.
👍 1