Hi kedronistas, I need your help again: How do I ...
# questions
f
Hi kedronistas, I need your help again: How do I handle a node whos function take another function as input? To make it clearer:
Copy code
def inner():
  return "World"

def outer(func):
  return "Hello " + func()

def create_pipeline(**kwargs) -> Pipeline:
  return pipeline(
    [
       node(
         func=outer,
         inputs="??? inner ???",
         outputs="some_string",
       )
    ]
  )
So basically I am asking how to tell kedro about an input which is a Callable.
✔️ 1
j
maybe
Copy code
node(
  func=lambda input1: outer(input1, func=inner),
  ...
)
i.e. create a curryfied (??) function on the fly
f
what is
input1
in your example?
j
whatever other inputs your node should have (none in your original example, sorry for the confusion)
so
Copy code
return pipeline(
    [
      node(
        func=lambda: outer(inner),
        inputs=None,
        outputs="some_string",
      )
    ]
  )
or, in the case of 1 dataset input,
Copy code
return pipeline(
    [
      node(
        func=lambda input1: outer(input1, func=inner),
        inputs="dataset1",
        outputs="some_string",
      )
    ]
  )
f
Thx, and in case of many additional normal inputs (additional to the
inner
function), I guess the lambda arguments simply increase right? like:
Copy code
return pipeline(
    [
      node(
        func=lambda i1, i2, i3, i4: outer(i1, i2, i3, i4, func=inner),
        inputs=["dataset1", "ds2", "more_input", "last_one"],
        outputs="some_string",
      )
    ]
  )
j
correct!
does this look like a reasonable workaround? unfortunately
node(inputs=)
only supports dataset names and param specifications (cc @Nok Lam Chan)
f
yes, this is reasonable, thank you very much.
✔️ 1
Maybe the whole question resulted from a (maybe)misconception of mine: I have self defined class with some init parameters (incl. functions like in my original question) and a bunch of methods. I now want to utilize kedro to create an object from that class (now solved with your help) and then triggering this classes'/object's methods in other nodes. It does not seem that kedro was created for that kind of usage, or am I missing something key?
j
I'm not sure if it's exactly what you mean, but
node(func=
can be any callable, so you can definitely have some stateful classes:
Copy code
class DataProcessor:
  def inner(self, ds1):
    ...

  def outer(self, ds1):
    self.inner(ds1)
    return ds1

...
  proc = DataProcessor()

  return pipeline([
    node(
      func=proc.outer,
      inputs="dataset1",
    )
  ])
does it make sense?
f
yes. but what if the creation of
proc
is also it's own kedro node? How would I run the
proc.outer
statement from your example?
j
hmmm let me think about it. "custom datasets" is the first thing that comes to mind, but maybe there's a simpler way.
👍 1
👍🏼 1
l
Hi @fmfreeze interesting case you got there. Curious if you have a less simplified example to understand more of what you are trying to do. Your first example, I would create a
dataset
which is just the Callable object. This is perfectly fine, and you could use a
pickle.PickleDataSet
for objects like that if you needed to save them for some reason
Copy code
def inner() -> str:
  return "World"

def outer(func: Callable) -> str:
  return "Hello " + func()

def create_inner_func() -> Callable:
    return inner  # do not call the function, so return the Callable object

def create_pipeline(**kwargs) -> Pipeline:
  return pipeline([
       node(func=create_inner_func, outputs="inner_func"),
       node(func=outer, inputs="inner_func", outputs="some_string"),
  ])
Or, equivalently you don't really need the
create_inner_func()
method if you prefer a lambda:
Copy code
def create_pipeline(**kwargs) -> Pipeline:
  return pipeline([
       node(func=lambda: inner, outputs="inner_func"),
       node(func=outer, inputs="inner_func", outputs="some_string"),
  ])
Im trying to understand your usecase, and this is what I get from it: 1. You want to create a class object using regular
__init__
2. The
__init__
takes both strings and Callables 3. You want to call class methods of your instantiated class 4. These class methods may also take arguments --- Here is a full example I made according to those points, using 2 classes and different ways of passing your lambda methods.. Gets complex quite quickly I guess but does it do what you want @fmfreeze ? It is a runnable example.
Copy code
from typing import Callable

from kedro.pipeline import node, Pipeline, pipeline

class YourCustomClass:
  """This class joins strings from multiple sources."""
  def __init__(self, a_string, a_callable):
      self.a_string = a_string
      self.a_callable = a_callable

  def create_output(self, an_extra_string):
    """Returns `a_string`, `a_callable()` and `an_extra_string` joined with spaces."""
    return " ".join([self.a_string, self.a_callable(), an_extra_string])


def create_a_string():
    return "Hello"

def create_my_class(a_string, a_callable):
    return YourCustomClass(a_string, a_callable)

def get_class_output(my_class: YourCustomClass, extra_string: str) -> str:
    result = my_class.create_output(an_extra_string=extra_string)
    print(f"Node output is:\t{result}")
    return result

def create_world_callable() -> str:
    return "World"

def kedro_community_callable() -> str:
    return "Kedro Community"

def create_kedro_callable() -> Callable:
    """Function returns a callable"""
    return kedro_community_callable

def create_smiley_string() -> str:
    return ":)"

def create_wave_string() -> str:
    """Smiley that waves"""
    return "o/"

def create_pipeline(**kwargs) -> Pipeline:
  return pipeline([
      ## Create YourClass__init__() arguments as datasets including callables
      node(func=create_a_string, inputs=None, outputs="hello_string"),
      node(func=create_kedro_callable, inputs=None, outputs="kedro_callable"),  # callable from another function
      node(func=lambda: lambda: "World", inputs=None, outputs="world_callable"),  # callable as a lambda function
      ## Create class objects
      node(
           func=create_my_class,
           inputs=dict(a_string="hello_string", a_callable="world_callable"),
           outputs="hello_world_class"
       ),
      node(
          func=create_my_class,
          inputs=dict(a_string="hello_string", a_callable="kedro_callable"),
          outputs="hello_kedro_class"
      ),
      ## Create additional input to your class methods
      node(func=create_smiley_string, inputs=None, outputs="smiley_string"),
      node(func=create_wave_string, inputs=None, outputs="wave_string"),
      ## Call class methods
      node(
          func=get_class_output,
          inputs=dict(my_class="hello_world_class", extra_string="smiley_string"),
          outputs="hello_world_output"
      ),
      node(
          func=get_class_output,
          inputs=dict(my_class="hello_kedro_class", extra_string="wave_string"),
          outputs="hello_kedro_output"
      ),
  ])
The pipeline is essentially 2 similar pipelines at the same time - see the
kedro viz
output in these screenshots:" 1. partial pipeline of just 1 class 2. the whole pipeline as defined above
n
With stateful object you need to be careful if there are implicit dependency on the state. If the order matter, the object has to be pass around to ensure correct execution order.
👌 1
f
Thank you @Lodewic van Twillert for you effort and sorry for the super-late reply (my holidays were great :D). Your example works, I was able to apply your suggestions by wrapping (class/object-)methods into node-functions. That object instance I am then passing around along the pipeline. Though, that doesn't feel very "pythonic/kedronic" to me, to wrap methods of objects into it's own functions. Anyways, thanks for your help and I'd be glad to stay tuned if that can be solved better or kedro will support that differently in the future.