Would it be a good idea to add a “concatenate pand...
# questions
h
Would it be a good idea to add a “concatenate pandas pipeline” options to a pipeline? Which allows one to run the through in a pandas .pipe function instead of a traditional pipeline constructs with separate inmemory I/O when for example a run flag is supplied? My usecase is as follows: there is a long text-preprocessing pipeline we use, which looks kind of like this:
Copy code
return pipeline(
        [
            node(
                func=rename_columns,
                inputs="pretraining_set",
                outputs="renamed_df",
                name="rename_columns",
            ),
            node(
                func=truncate_description,
                inputs="renamed_df",
                outputs="truncated_df",
                name="truncate_description",
            ),
            node(
                func=drop_duplicates,
                inputs="truncated_df",
                outputs="deduped_df",
                name="drop_duplicates",
            ),
            node(
                func=pad_zeros,
                inputs="deduped_df",
                outputs="padded_df",
                name="pad_zeros",
            ),
            node(
                func=filter_0000,
                inputs="padded_df",
                outputs="filtered_df",
                name="filter_0000",
            ),
            node(
                func=clean_description,
                inputs="filtered_df",
                outputs="cleaned_df",
                name="clean_description",
            ),
            node(
                func=concat_title_description,
                inputs="cleaned_df",
                outputs="concatenated_df",
                name="concat_title_description",
            ),
        ]
    )
However, on AWS batch these would be run on separate containers, I now use the cloudpickle dataset to facilitate this, but it is actually not neccesary when i use something like dask. I could also instead run this pipeline like this:
Copy code
return (
        df.pipe(rename_columns)
        .pipe(truncate_description)
        .pipe(drop_duplicates)
        .pipe(pad_zeros)
        .pipe(filter_0000)
        .pipe(clean_description)
        .pipe(concat_title_description)
    )
The aforementioned pipeline has tags, and filtering in a modular pipeline depending on pre-training, tuning, which language, etc. The flatten pipeline would be nice to use in the case of kedro run runner=… concat_pipeline=true, or something like that. Is this idea worth exploring? It is really not essential, i can work around it, but the ability to have pipelines that can “fold” like this is quite appealing.
d
Interesting; I've never heard of the
.pipe()
method, even though it's been around since pandas 0.16.2! Out of curiosity, how is the behavior of
.pipe()
different from if
renamed_df
,
truncated_df
, etc. were all
MemoryDataSet
instances?
h
For me the main difference is that the
MemoryDataSet
is not an option in a distributed environment like AWS Batch. So the default_dataset should be changed to for example a PickleDataset, which requires serialisation and network IO. Apart from that, i am not sure whether
.pipe()
allows one to hook into vectorised optimisation procedures that Pandas supplies, i would think not because pipe is applied tablewise, and meant for sequential tablewise operations.
d
For me the main difference is that the
MemoryDataSet
is not an option in a distributed environment like AWS Batch.
In my opinion, this is an issue resulting from the way the deployment is done (and recommended in current Kedro docs), rather than an issue with
MemoryDataSet
per se. If I understand correctly, what you're proposing is to collapse multiple nodes into 1 that use
.pipe()
automatically, because you're deploying each node as a container, whereas if could just deploy a set of nodes (or modular pipeline) to each container it would also solve the issue (in, IMO, a cleaner way). But I understand your point.
h
I think to generalize the usecase, currently nodes in pipelines are ran separately. But, if a set of those nodes are all sequential operations performed on a pandas dataframe, then one could execute them with a different runner (which the .pipe method). I can just as easily make a node which uses the .pipe method, but if that node requires filtering, tagging, etc. then having control over which nodes in a pipeline to run using the PandasPipeRunner, would be neat.
but, its quite niche
d
Yeah, I think it's quite niche, but could work for your case. Re
I think to generalize the usecase, currently nodes in pipelines are ran separately.
I think the general sentiment (and also my opinion) is to moving towards a state where Kedro deployments aren't mapping nodes to tasks on workflow orchestrators, but rather mapping something like modular pipelines, because nodes usually represent a much smaller unit of work (like what you have), but it doesn't make sense to have them each be a container in most cases. But, while the sentiment is moving, the deployment guides haven't changed and the approach isn't formalized. :D
👍 1
h
one could use something like a tag to indicate which nodes should run together in a container, in principle the original kedro syntax already supports this through the run commands options, to-nodes, from-nodes
so basically, i could accomplish this myself by changing the run command that is issued through aws batch