fmfreeze
08/28/2023, 3:19 PMdef inner():
return "World"
def outer(func):
return "Hello " + func()
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=outer,
inputs="??? inner ???",
outputs="some_string",
)
]
)
So basically I am asking how to tell kedro about an input which is a Callable.Juan Luis
08/28/2023, 3:45 PMnode(
func=lambda input1: outer(input1, func=inner),
...
)
i.e. create a curryfied (??) function on the flyfmfreeze
08/28/2023, 3:54 PMinput1
in your example?Juan Luis
08/28/2023, 3:58 PMreturn pipeline(
[
node(
func=lambda: outer(inner),
inputs=None,
outputs="some_string",
)
]
)
return pipeline(
[
node(
func=lambda input1: outer(input1, func=inner),
inputs="dataset1",
outputs="some_string",
)
]
)
fmfreeze
08/28/2023, 4:02 PMinner
function), I guess the lambda arguments simply increase right?
like:
return pipeline(
[
node(
func=lambda i1, i2, i3, i4: outer(i1, i2, i3, i4, func=inner),
inputs=["dataset1", "ds2", "more_input", "last_one"],
outputs="some_string",
)
]
)
Juan Luis
08/28/2023, 4:12 PMnode(inputs=)
only supports dataset names and param specifications (cc @Nok Lam Chan)fmfreeze
08/28/2023, 4:17 PMJuan Luis
08/28/2023, 4:23 PMnode(func=
can be any callable, so you can definitely have some stateful classes:
class DataProcessor:
def inner(self, ds1):
...
def outer(self, ds1):
self.inner(ds1)
return ds1
...
proc = DataProcessor()
return pipeline([
node(
func=proc.outer,
inputs="dataset1",
)
])
does it make sense?fmfreeze
08/28/2023, 4:24 PMproc
is also it's own kedro node?
How would I run the proc.outer
statement from your example?Juan Luis
08/28/2023, 4:26 PMLodewic van Twillert
08/29/2023, 7:26 PMdataset
which is just the Callable object. This is perfectly fine, and you could use a pickle.PickleDataSet
for objects like that if you needed to save them for some reason
def inner() -> str:
return "World"
def outer(func: Callable) -> str:
return "Hello " + func()
def create_inner_func() -> Callable:
return inner # do not call the function, so return the Callable object
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(func=create_inner_func, outputs="inner_func"),
node(func=outer, inputs="inner_func", outputs="some_string"),
])
Or, equivalently you don't really need the create_inner_func()
method if you prefer a lambda:
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(func=lambda: inner, outputs="inner_func"),
node(func=outer, inputs="inner_func", outputs="some_string"),
])
__init__
2. The __init__
takes both strings and Callables
3. You want to call class methods of your instantiated class
4. These class methods may also take arguments
---
Here is a full example I made according to those points, using 2 classes and different ways of passing your lambda methods.. Gets complex quite quickly I guess but does it do what you want @fmfreeze ?
It is a runnable example.
from typing import Callable
from kedro.pipeline import node, Pipeline, pipeline
class YourCustomClass:
"""This class joins strings from multiple sources."""
def __init__(self, a_string, a_callable):
self.a_string = a_string
self.a_callable = a_callable
def create_output(self, an_extra_string):
"""Returns `a_string`, `a_callable()` and `an_extra_string` joined with spaces."""
return " ".join([self.a_string, self.a_callable(), an_extra_string])
def create_a_string():
return "Hello"
def create_my_class(a_string, a_callable):
return YourCustomClass(a_string, a_callable)
def get_class_output(my_class: YourCustomClass, extra_string: str) -> str:
result = my_class.create_output(an_extra_string=extra_string)
print(f"Node output is:\t{result}")
return result
def create_world_callable() -> str:
return "World"
def kedro_community_callable() -> str:
return "Kedro Community"
def create_kedro_callable() -> Callable:
"""Function returns a callable"""
return kedro_community_callable
def create_smiley_string() -> str:
return ":)"
def create_wave_string() -> str:
"""Smiley that waves"""
return "o/"
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
## Create YourClass__init__() arguments as datasets including callables
node(func=create_a_string, inputs=None, outputs="hello_string"),
node(func=create_kedro_callable, inputs=None, outputs="kedro_callable"), # callable from another function
node(func=lambda: lambda: "World", inputs=None, outputs="world_callable"), # callable as a lambda function
## Create class objects
node(
func=create_my_class,
inputs=dict(a_string="hello_string", a_callable="world_callable"),
outputs="hello_world_class"
),
node(
func=create_my_class,
inputs=dict(a_string="hello_string", a_callable="kedro_callable"),
outputs="hello_kedro_class"
),
## Create additional input to your class methods
node(func=create_smiley_string, inputs=None, outputs="smiley_string"),
node(func=create_wave_string, inputs=None, outputs="wave_string"),
## Call class methods
node(
func=get_class_output,
inputs=dict(my_class="hello_world_class", extra_string="smiley_string"),
outputs="hello_world_output"
),
node(
func=get_class_output,
inputs=dict(my_class="hello_kedro_class", extra_string="wave_string"),
outputs="hello_kedro_output"
),
])
The pipeline is essentially 2 similar pipelines at the same time - see the kedro viz
output in these screenshots:"
1. partial pipeline of just 1 class
2. the whole pipeline as defined aboveNok Lam Chan
08/29/2023, 8:03 PMfmfreeze
09/18/2023, 2:05 PM