Hervé Lauwerier
11/10/2022, 4:51 PMnot found in the DataCatalog
def create_pipeline(**kwargs) -> Pipeline:
conf_path = str(settings.CONF_SOURCE)
conf_loader = TemplatedConfigLoader(conf_source=conf_path, env="local")
parameters = conf_loader.get("parameters*", "parameters*/**", "**/parameters*")
db_connector = LocalDbConnector(parameters)
tables = generate_table_list(parameters=parameters, db_connector=db_connector)
io = generate_catalog_from_table_list(tables=tables, db_connector=db_connector)
return pipeline(
generate_nodes(tables=tables)
)
def generate_catalog_from_table_list(tables, db_connector):
io = DataCatalog()
for table in tables:
# Add sql catalog entry
io.add(
f"sql_{table.get_raw_table_name()}",
SQLTableDataSet(
credentials=dict(con=db_connector.connection_string()+";DATABASE="+table.database.get_local_database_name()),
table_name=f"{table.name}"
), replace = True
)
Allen Ma
11/11/2022, 7:20 AM