Juan Luis
02/23/2023, 3:45 AMkedro
CLI
2. kedro new
creates a new directory
3. cd {newdir} && python -m venv .venv
4. One needs to install Kedro inside the .venv
again
seems like this is not a problem if one uses out-of-tree environments, like conda does. Is there a way around for the other case? Something like:
1. mkdir {newdir} && cd {newdir} && python3 -m venv .venv
2. Install Kedro in the new .venv
3. (From {newdir}
) kedro new --here
Vici
02/23/2023, 12:48 PMkedro micropkg
, but failing so far 😕. So I've built that custom Dataset class MyDataset
. I'd love to share it as a micro-package with a colleague of mine. This should be possible, as the micro-packaging docs say :
A micro-package can be any part of Python code in a Kedro project [...].But running
kedro micropkg package src/my_project/extras/datasets/my_folder/my_dataset.py
only yields me the following error message:
kedro.framework.cli.utils.KedroCliError: The micro-package location you provided is not a valid Python module path
Run with --verbose to see the full exception
Error: The micro-package location you provided is not a valid Python module path
I tried running with --verbose
, but it's not a valid argument for any of kedro, micropkg or package. So that didn't help. Neither do I understand how my_dataset.py
is not a valid Python module 😢. Any of you have an idea how one would go about resolving this issue? Thanks in advance!Rafał Nowak
02/24/2023, 8:20 AM--params "section1.section2.name:value"
where section1.section2
is defined in parameters.yml
so it seems that there is some tree section1.section2
with some parameters. I would like to change only one of them.
I think I know that kedro is not able to override only one paramter in the tree. I have to overrside the full root which is not user friendly in the CLI.
I see that since kedro 0.18.5, one can use OmegaConf now. Does it change this limitatiom?
If so, is it possible to use global_patterns
like it was in TemplatedConfigLoader ?Robertqs
02/24/2023, 9:34 AMBailey
02/24/2023, 10:26 AMBalachandran Ponnusamy
02/25/2023, 12:58 AMSebastian Cardona Lozano
02/25/2023, 1:21 AMZoran
02/26/2023, 8:27 PMXinghong Fang
02/27/2023, 3:14 AM_multiprocessing.SemLock is not implemented
issue when launching the pipeline. A quick google search bring me to this issue https://stackoverflow.com/questions/34005930/multiprocessing-semlock-is-not-implemented-when-running-on-aws-lambda Looks like AWS Lambda's python runtime is missing /dev/shm
, which seems to be needed by the KedroSession
Has anyone successfully ran a kedro pipeline on AWS Lambda? Thanks in advance!Hugo Evers
02/27/2023, 10:17 AMparameters.yml
ParameterGrid:
name_of_parameter:
version_1:
- value1
- value2
version_2:
- value1
etc.
and now i could run through these options with the namespace.
However, now i need to have dataset entries in the catalog.yml which match these version_1
and version_2
names.
Since i dont want these to be stored in memory and than destroyed.
Instead i want to use the kedro_mlflow datasets.
so for example for the parquet files i would use something like:
X_test_{{ split_crit }}:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataSet
data_set:
type: pandas.ParquetDataSet
filepath: <s3://sagemaker-vertex/data/05_model_input/X_test_{{> split_crit }}.parquet
and for the metrics:
my_model_metrics_{{ split_crit }}:
type: kedro_mlflow.io.metrics.MlflowMetricDataSet
key: accuracy
and for the models
multi_modal_model:
type: kedro_mlflow.io.models.MlflowModelLoggerDataSet
flavor: mlflow.pyfunc
pyfunc_workflow: python_model
save_args:
conda_env:
python: "3.9.10"
dependencies:
- "mlflow==1.27.0"
However, in kedro these output datasets cannot be shared (even though in mlflow this would be fine)Tomás Rojas
02/27/2023, 7:25 PMCustomDataSet
for this purpose. The problem is that I want to make a PartitionedDataSet
from it but I get complications. Here is the class I made:
class LedExperiment(AbstractDataSet):
def __init__(self, filepath: str):
breakpoint()
self.path = filepath
self.files = glob.glob(os.path.join(filepath, "*"))
self.files.sort()
self.gate_voltage = self.get_gate_voltage(self.path)
self.info_path, self.voltages_path, self.data_path = self.files
@staticmethod
def get_gate_voltage(path: str) -> float:
"""
This is a function that is able to get the gate voltage from the folder name
that is the root of the data
:param path: path of the data, ussualy but not restricted to self.path
:return: the voltage from the Dirac Point used as gate voltage
"""
# note: sometimes there is more than one measurement for one voltage from the DP, it should
# be always separed by an underscore "_".
breakpoint()
folder_name = os.path.split(path)[-1]
gate_voltage = float(folder_name)
return gate_voltage
@staticmethod
def get_info(path: str, gate_voltage: float) -> pd.DataFrame:
"""
This method takes a path to the info file and returns a pandas
datatrame of one row and the info in each column
:param path: path to the info file of the experiment
:param gate_voltage: this is the gate voltage with respect to the Dirac Point
:return: a pandas dataframe with the parsed information
"""
with open(path, "r") as f:
r = f.read()
r = r.split("\n")[1:-2]
r = [i.split(",") for i in r]
r = [item for sublist in r for item in sublist]
r = [i.replace(" ", "") for i in r]
r = {i.split("=")[0]: i.split("=")[1] for i in r}
r["Vmin"] = float(r["Vmin"][:-1])
r["Vmax"] = float(r["Vmax"][:-1])
r["Vstep"] = float(r["Vstep"][:-1])
r["Cycles"] = int(r["Cycles"])
r["waitingtime"] = float(r["waitingtime"][:-1])
r["timeatlight"] = float(r["timeatlight"][:-1])
r["timeatdark"] = float(r["timeatdark"][:-1])
r["wavelength"] = float(r["wavelength"][:-2])
r["gate_voltage"] = gate_voltage
info = pd.DataFrame(r, index=["value"])
return info
@staticmethod
def get_led_voltage_list(voltage_list_path: str) -> pd.DataFrame:
"""
This funtion takes the path to the file containing the list of the voltages to the led driver
and returns a pandas dataframe containing all the voltages in the order they appear in the file
which is the same order as they were used.
:param voltage_list_path: path to the file containing the voltage list.
:return: a pandas dataframe with all the information.
"""
with open(voltage_list_path, "r") as f:
r = f.read()
r = r.split("\n")[:-1][::2]
voltages = [float(i) for i in r]
voltages = pd.DataFrame(voltages, columns=["LED driver voltages"])
return voltages
@staticmethod
def get_data(data_path: str) -> pd.DataFrame:
"""
This function reads the data from the experiment
:param data_path: path to the file containing the time series data
:return: a pandas dataframe with the time series data of the currents
"""
return pd.read_csv(data_path, sep="\t")
def _load(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
This function loads the data using the information provided in the init
:return: A tuple with the information, LED voltages and data DataFrames in
that order.
"""
breakpoint()
info = self.get_info(self.info_path, self.gate_voltage)
led_voltages = self.get_led_voltage_list(self.voltages_path)
data = self.get_data(self.data_path)
return info, led_voltages, data
def _save(self, data) -> None:
# TODO: finish saving method
pass
def _describe(self) -> Dict[str, Any]:
"""
Returns a dict that describes the attributes of the dataset.
:return: Returns a dict that describes the attributes of the dataset.
"""
return dict(
information_path=self.info_path,
voltages_path=self.voltages_path,
data_path=self.data_path,
gate_voltage=self.gate_voltage # note that this is w respect to the DP
)
The thing is that when I make a PartitionedDataSet from it the paths get all messed up, which is not ideal, it results in the class having errors.
Can anyone help me with this?
EDIT: I added 3 replies to the thread explaining further the issueZoran
02/28/2023, 4:53 PMtomohiko kato
03/01/2023, 9:24 AMAccess run data and compare runs
step.
(Data fetching does not seem to be working and nothing is displayed.)
error message(kedro_viz\api\graphql\serializers.py
, line 46)
run_command=run_blob.get("cli", {}).get("command_path"),
AttributeError: 'str' object has no attribute 'get'
The environment and version are follows.
OS: windows
IDE: pycharm
venv: pyenv
python: 3.9.10
kedro: 0.18.5
kedro-viz: 0.5.3
The error log seems to indicate that the run_blob
perse is not working.
Actually I checked with degugger and run_blob.get("cli", {})
was recognized as str, not dict.
Is this a problem specific to my environment?Matheus Pinto
03/01/2023, 12:23 PMclass ModelPipeline(BaseEstimator):
"""A pipeline to train and make predictions using a machine learning model.
Args:
params (dict): A dictionary containing the parameters required to build the pipeline.
Attributes:
pipeline (list): A list of transformers and an estimator built using the given parameters.
data_prepocessing_pipe (list): A list of transformers in the pipeline
used for data pre-processing.
estimator: The estimator in the pipeline used for making predictions.
target_names (list): A list of column names in the target variable.
is_fitted (bool): A flag indicating if the estimator is fitted or not.
"""
def __init__(self, params: dict):
"""Initializes the ModelPipeline object.
The object is initialized with the given parameters to create a pipeline.
Args:
params (dict): A dictionary containing the parameters required to build the pipeline.
"""
self.params = params
self.target_params = list(
params["model"]["data_preparation"]["target_builder"]["kwargs"].values()
)[0]
func_path = self.params["builder_function"]
module_name, func_name = func_path.rsplit(".", 1)
module = importlib.import_module(module_name)
func = getattr(module, func_name)
self.pipeline = func(self.params)
def fit(self, X, y):
"""Fits the pipeline to the given data.
Fits each transformer in the pre-processing pipeline
to the data and then fits the estimator to the transformed data.
Args:
X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
y (array-like or sparse matrix): Target values of shape (n_samples,) or
(n_samples, n_targets)
Returns:
self: Returns an instance of self.
"""
X_ = self.pipeline[:-1].fit_transform(X, y)
self.pipeline[-1:].fit(X_, y)
self.is_fitted = True
return self
@check_is_fitted
@ensure_data_quality
def predict(self, X: tp.Union[Matrix, Vector]) -> tp.Union[Matrix, Vector]:
"""Predicts the target variable using the fitted pipeline.
Transforms the input data using the pre-processing pipeline and then makes predictions
using the fitted estimator.
Args:
X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
Returns:
pandas.DataFrame: A dataframe containing the predicted values of
shape (n_samples, n_targets).
"""
X_ = self.pipeline[:-1].transform(X)
y_pred = self.pipeline[-1].predict(X_)
return y_pred
@check_is_fitted
@ensure_data_quality
def inference(
self, X: tp.Union[Matrix, Vector], y: tp.Union[Matrix, Vector] = None
) -> pd.DataFrame:
"""Custom inference using the fitted pipeline.
Transforms the input data using the pre-processing pipeline and then makes inference
using the fitted estimator and for the specific problem., classification and regression
Args:
X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
Returns:
pandas.DataFrame: A dataframe containing the predicted values of
shape (n_samples, n_targets).
"""
X_ = self.pipeline[:-1].transform(X)
inference = self.pipeline[-1].inference(X_, y)
return inference
nawaz ahmad
03/02/2023, 6:26 AMSergei Benkovich
03/02/2023, 12:48 PMNicolas Rosso
03/02/2023, 1:27 PMfrom kedro.pipeline import Pipeline, node, pipeline
from .nodes import medium_posts_extract_file, medium_posts_transform_file, medium_posts_upload_transformed_file_to_gcp, medium_posts_persist_file_in_gcp, delete_files
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
#Defino los nodos dentro del pipeline y el orden de ejecución. Cada nodo puede tener 1 o mas funciones (definidas en nodes.py)
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=medium_posts_extract_file,
inputs=None,
outputs="medium_posts_raw_file",
name="medium_posts_extract_file_node",
tags=["extract"]
),
node(
func=medium_posts_transform_file,
inputs="medium_posts_raw_file",
outputs="medium_posts_transformed_file",
name="medium_posts_transform_file_node",
tags=["transform"]
),
node(
func=medium_posts_upload_transformed_file_to_gcp,
inputs="medium_posts_transformed_file",
outputs=None,
name="medium_posts_upload_transformed_file_to_gcp_node",
tags=["upload"]
),
node(
func=medium_posts_persist_file_in_gcp,
inputs="medium_posts_raw_file",
outputs=None,
name="medium_posts_persist_file_in_gcp_node",
tags=["persist"]
),
node(
func=delete_files,
inputs="medium_posts_transformed_file",
outputs=None,
name="delete_files_node",
tags=["delete"]
)
],
tags_hierarchy={
"extract": [],
"transform": ["extract"],
"upload": ["transform"],
"persist": ["upload"],
"delete": ["persist"]
}
)
Juan Diego
03/02/2023, 2:39 PMdata_folder: ${CONF_SOURCE}/data
This obviously doesn’t works, but you get the idea.
Many thanks! ☺️Ricardo Araújo
03/02/2023, 6:26 PMget-data -> train-model -> evaluate-model
. Now, the model can be any of sklearn's models, all with the same interface. What I'd like to do is, from a list of models specified in parameters
, run many instances of this pipeline each with one model of the list (of course, I'd like pipelines to run in parallel).
I can use modular pipelines to instantiate the pipeline many times, but I'm not sure how to use the model list in the parameters file. Any ideas?Balachandran Ponnusamy
03/02/2023, 8:01 PMSebastian Pehle
03/03/2023, 9:40 AMRicardo Araújo
03/03/2023, 11:53 PMazazel daiki
03/04/2023, 9:10 AMDavid
03/05/2023, 4:44 PMregister_prefect_flow.py
script given by Kedro documentation. Unfortunately, I don't seem to succeed because Prefect 1.0 API is different from Prefect 2.0.
Thanks in advance !Ofir
03/05/2023, 10:46 PMZoran
03/06/2023, 2:11 PMZiren Lin
03/06/2023, 8:55 PMkedro viz
query, it shows error message in the screenshot. And when I clicked the experiment tracking tab, I can't see anything. I am wondering how to fix this to see the results. Can anyone please help? Thanks!Ziren Lin
03/06/2023, 10:34 PMTomás Rojas
03/06/2023, 11:32 PMPartitionedDataSets
. I noticed they return a dictionary with bounded methods for loading each dataset. My question is: Is there a way to write the nodes simply as a function of the object returned by the bounded method or should I write the nodes thinking about the dictionary?Brian Gereke
03/07/2023, 6:34 PMglobals.yml
to template both parameters.yml
and catalog.yml
with the new OmegaConfigLoader
?