Antonio Perelló Moragues
09/04/2023, 8:58 AMJuan Luis
09/04/2023, 9:00 AMAntonio Perelló Moragues
09/04/2023, 9:11 AMFlorianGD
09/04/2023, 9:52 AMif your_input_df.empty:
return pd.DataFrame()
?Antonio Perelló Moragues
09/04/2023, 9:55 AMFlorianGD
09/04/2023, 9:55 AMbefore_node_run
Lodewic van Twillert
09/04/2023, 10:22 AMbefore_node_run
hook for this, but I don't think it's pretty. And you'll need some work to run async... It ain't pretty but it could work 🤷♂️
Here's what worked for me @Antonio Perelló Moragues as an example.
• Hook before_node_run
• Check if the first input dataset is None or an empty dataframe
• Overwrite the node.func
◦ The new function must have the same name or Kedro will break
◦ The new function just returns the first dataset - so None or the empty dataframe
@hook_impl
def before_node_run(self, node: Node, catalog, inputs, is_async, session_id):
# check if node is allowed to skip
if "skip_if_none" in node.tags:
input_keys = list(inputs.keys())
first_input = input_keys[0]
first_dataset = inputs[first_input]
# check is first dataset is empty or None
if (
first_dataset is None
or (isinstance(first_dataset, pd.DataFrame) and first_dataset.empty)
):
# overwrite node function to just return first dataset
def return_first_dataset(*args, **kwargs):
print(f"Skipping {node.name} because {first_input} is empty.")
return first_dataset
# the function name must be the same as before if `node.name` is not explicitly set.
return_first_dataset.__name__ = node.func.__name__
node.func = return_first_dataset # overwrite the node function
A pipeline to test with the example Iris dataset from the Kedro starter in the file attached. And a Kedro-viz screenshot of it 👍def skip_if_none_decorator(func):
def skip_if_input_is_none(*args, **kwargs):
if args is not None:
first_dataset = args[0]
elif kwargs is not None:
first_input = list(kwargs.keys())[0]
first_dataset = kwargs[first_input]
# check is first dataset is empty or None
if (
first_dataset is None
or (isinstance(first_dataset, pd.DataFrame) and first_dataset.empty)
):
return first_dataset
else:
return func(*args, **kwargs)
skip_if_input_is_none.__name__ = func.__name__
return skip_if_input_is_none
Then you can use it in two ways...
Option 1: wrap your node function directly in your pipeline
• Nice because you can choose if the function is skippable
pipeline(
[
node(
func=filter_dataframe_to_empty,
inputs=["example_iris_data"],
outputs="filtered_data_empty"
),
node(
func=skip_if_none_decorator(processing_step), # <-- wrap `processing_step` method with decorator here
inputs="filtered_data_empty",
outputs="processed_data_1"
),
node(
func=print_output_shape,
inputs=["processed_data_1"],
outputs=None
),
]
)
Or option 2, use the decorator directly on your processing_step
function:
• Now processing_step
is always decorated
@skip_if_none_decorator
def processing_step(data: pd.DataFrame):
assert data.shape[0] > 0, "Data is empty but node was run anyway..."
# do something with the data...
df_output = data
return df_output
If you have many functions where you would write an if/else statement at the beginning, you can re-use the same decorator for all of them :)Antonio Perelló Moragues
09/04/2023, 10:40 AM