https://kedro.org/ logo
#questions
Title
# questions
a

Antonio Perelló Moragues

09/04/2023, 8:58 AM
Hi Kedro community! What is the best option to skip several nodes in a pipeline when the input is empty?
j

Juan Luis

09/04/2023, 9:00 AM
hola @Antonio Perelló Moragues! could you clarify what do you mean with "empty input"?
a

Antonio Perelló Moragues

09/04/2023, 9:11 AM
I have some nodes that clean and filter my data, and it may happen that the output of this "subpipeline" results in an empty dataframe. Afterwards, the nodes that use that "clean dataframe" as input, may raise an error if is empty, because the calculations need to have some data. In order to avoid to amend every node to have an if-else clause at the beginning of the node function, I was wondering if I could write a validation node before to skip those nodes that may fail and return the final output as an empty dataframe. I think that it would be, somehow, the following: Given A --> B --> C; if the output of node A is empty, skip node B (or run B'), and go directly to node C
f

FlorianGD

09/04/2023, 9:52 AM
start your node B with :
Copy code
if your_input_df.empty:
    return pd.DataFrame()
?
a

Antonio Perelló Moragues

09/04/2023, 9:55 AM
@FlorianGD Yes, that's what I started to implement, but I wondered if there was another option to avoid amending all the nodes that may fail (as I have several nodes that use A's output)
f

FlorianGD

09/04/2023, 9:55 AM
Or wrap it in a hook maybe
before_node_run
l

Lodewic van Twillert

09/04/2023, 10:22 AM
@FlorianGD I am trying to come up with a
before_node_run
hook for this, but I don't think it's pretty. And you'll need some work to run async... It ain't pretty but it could work 🤷‍♂️ Here's what worked for me @Antonio Perelló Moragues as an example. • Hook
before_node_run
• Check if the first input dataset is None or an empty dataframe • Overwrite the node.func ◦ The new function must have the same name or Kedro will break ◦ The new function just returns the first dataset - so None or the empty dataframe
Copy code
@hook_impl
def before_node_run(self, node: Node, catalog, inputs, is_async, session_id):
    # check if node is allowed to skip
    if "skip_if_none" in node.tags:
        input_keys = list(inputs.keys())
        first_input = input_keys[0]
        first_dataset = inputs[first_input]

        # check is first dataset is empty or None
        if (
            first_dataset is None
            or (isinstance(first_dataset, pd.DataFrame) and first_dataset.empty)
        ):
            # overwrite node function to just return first dataset
            def return_first_dataset(*args, **kwargs):
                print(f"Skipping {node.name} because {first_input} is empty.")
                return first_dataset

            # the function name must be the same as before if `node.name` is not explicitly set.
            return_first_dataset.__name__ = node.func.__name__
            node.func = return_first_dataset  # overwrite the node function
A pipeline to test with the example Iris dataset from the Kedro starter in the file attached. And a Kedro-viz screenshot of it 👍
👍🏼 1
Alternatively, you don't need hooks for this. But use Decorators instead? Here's the same hook as above, but as a decorator
Copy code
def skip_if_none_decorator(func):
    def skip_if_input_is_none(*args, **kwargs):
        if args is not None:
            first_dataset = args[0]
        elif kwargs is not None:
            first_input = list(kwargs.keys())[0]
            first_dataset = kwargs[first_input]

        # check is first dataset is empty or None
        if (
            first_dataset is None
            or (isinstance(first_dataset, pd.DataFrame) and first_dataset.empty)
        ):
            return first_dataset
        else:
            return func(*args, **kwargs)

    skip_if_input_is_none.__name__ = func.__name__
    return skip_if_input_is_none
Then you can use it in two ways... Option 1: wrap your node function directly in your pipeline • Nice because you can choose if the function is skippable
Copy code
pipeline(
    [
        node(
            func=filter_dataframe_to_empty,
            inputs=["example_iris_data"],
            outputs="filtered_data_empty"
        ),
        node(
            func=skip_if_none_decorator(processing_step),  # <-- wrap `processing_step` method with decorator here
            inputs="filtered_data_empty",
            outputs="processed_data_1"
        ),
        node(
            func=print_output_shape,
            inputs=["processed_data_1"],
            outputs=None
        ),
    ]
)
Or option 2, use the decorator directly on your
processing_step
function: • Now
processing_step
is always decorated
Copy code
@skip_if_none_decorator
def processing_step(data: pd.DataFrame):
    assert data.shape[0] > 0, "Data is empty but node was run anyway..."

    # do something with the data...
    df_output = data
    return df_output
If you have many functions where you would write an if/else statement at the beginning, you can re-use the same decorator for all of them :)
👍🏼 1
🥳 1
👍 1
a

Antonio Perelló Moragues

09/04/2023, 10:40 AM
Thanks to all!
3 Views