Qiuyi Chen
11/30/2022, 6:35 PMfrom pyspark.sql import DataFrame
def function_a(params: Dict, *df_lst: List[DataFrame]):
report = pd.Dataframe()
for df in df_lst:
temp = function(df,params)
report = pd.concat([report,temp])
return report
I can run function like this
Function_a(params, df1,df2,df3)
But in the pipeline, how can I define the node and catalog in this situation. Here is what I did, please let me know where I did it wrong
def create_pipeline(**kwargs):
return Pipeline(
[ node( function = function_a,
Inputs = ["params", "df_lst"],
outputs= "report",
]
)
Catalog = DataCatalog(
data_sets={"df_lst": df1},
feed_dict={"params":params, },
)
I can only run the pipeline when df_lst is just one dataframe, but I do want it do be something like “df_lst”: df_1,df_2,df_3 …df_n(n>3)datajoely
11/30/2022, 6:45 PM["params", "input1", "input2", "intput3"…]
it should be passed to the node with a function signature like *df_list
This is an older project but I did something similar in this function and this nodeQiuyi Chen
11/30/2022, 6:54 PMFabian
12/01/2022, 10:55 AM