Abdullah Alsqoor
02/18/2024, 12:36 PMfilepath: <gcs://bucket/table-name/year={}/month={}/day={}>
year month and day templates will be replaced with either
* or certain day i.e. year=2024 or list of days year={2023, 2024} day={1, 2, 3, 4}
<gcs://bucket/table-name/year=2024/month=1/day=*> # meaning get all the days in year=2024 folder in month=1 folder
<gcs://bucket/table-name/year=2023/month={12,11}/day=*> # Get all parquet files in year=2023 folder in month=12 and month=11 folders in all days folders day=1... day=31
Dmitry Sorokin
02/19/2024, 10:47 AMAbdullah Alsqoor
02/19/2024, 12:09 PMlist_of_dates = ['2024/2/19', '2024/2/18', '2024/2/17', '2024/2/16', '2024/2/15']
path_template = '<gcs://bucket/table-name/year={}/month={}/day={}>'
def format_date_template(date_str):
"""Converts a date string (YYYY/MM/DD) into the desired path template format.
Args:
date_str (str): The date string in YYYY/MM/DD format.
Returns:
str: The formatted path template string.
"""
try:
# Extract year, month, and day components using slicing and conversion
year, month, day = date_str.split('/')
year = int(year)
month = int(month)
day = int(day)
# Format the template with extracted values
formatted_template = path_template.format(year, month, day)
return formatted_template
except ValueError:
print(f"Invalid date format for '{date_str}': Please use YYYY/MM/DD format.")
return None # Indicate error for invalid date strings
# Map the function to get formatted templates
formatted_paths = list(map(format_date_template, list_of_dates))
# Print the resulting list of formatted paths
print(formatted_paths)
Now will read the data stored in formatted_paths
df = spark.read.parquet(gcs_path)
I wonder if there is something like this built-in with catalogDmitry Sorokin
02/19/2024, 6:20 PMbefore_pipeline_run
hook to dynamically modify the catalog. Inside this hook, you can use the catalog.add
method to add datasets: catalog.add("dynamic_ds", SparkDataset(filepath=formatted_path))
.Abdullah Alsqoor
02/20/2024, 5:31 AM