Gregor Höhne
11/01/2023, 6:08 PMtrainer
,
2. creates the lightning data module
with the data loaders,
3. creates the model
lightning module,
4. train
the model using the trainer, the data module and the model module
Following num_workers is the variable used in the Pytorch DataLoader for parallelization of data loading:
When using the Sequential Runner and num_workers=0
, every pipeline is executed as expected.
When using the Thread Runner and num_workers=0
, also everything works as expected.
When using num_workers > 1
and the Thread Runner the trainings get stuck in the sanity check of the lightning model module.
When hard coding a dummy_ds which is passed between each modular pipeline, running with num_workers > 1
works (at least sometimes).
Has anyone experience with the Pytorch lightning module and could help me please to get the most out of the modular pipeline framework?Juan Luis
11/02/2023, 8:34 AMnum_workers > 1
? wondering if there's something clashing between Kedro threads and PyTorch onesGregor Höhne
11/02/2023, 8:39 AMnum_worker > 0
results in being stuck in the sanity check - therefore not being able to load the batches at all."{dataset_name}":
type: MemoryDataSet
copy_mode: assign
Juan Luis
11/02/2023, 8:44 AMGregor Höhne
11/02/2023, 9:12 AMmarrrcin
11/02/2023, 10:07 AMnum_workers>1
will basically do a fork of a process, it might be problematic.
Try the following first:
1. Run nodes up to the training node, so that all datasets required by training node are materialized (saved via Data Catalog)
2. Run only training node
3. Run nodes after training nodeGregor Höhne
11/02/2023, 2:44 PMmarrrcin
11/02/2023, 2:45 PMkedro run
(meaning - whole pipeline) or single nodes?Gregor Höhne
11/02/2023, 2:47 PMmarrrcin
11/02/2023, 2:52 PMfrom torch import multiprocessing
def set_multiprocessing_method():
if multiprocessing.get_start_method() != "spawn":
multiprocessing.set_start_method(
"spawn", force=True
) # Only SPAWN works with Cuda
Invoke the above as soon as possible in the node, maybe it will helpGregor Höhne
11/02/2023, 3:03 PMTypeError: h5py objects cannot be pickled
The h5py objects are used in the data module.
To overcome this I used copy_mode: assign in the MemoryDataset definition in the catalog file.
The h5py object seem to be often a problem. Do you have any idea how to overcome this problem?marrrcin
11/02/2023, 3:06 PM__getitem__
or in the __init__
?Gregor Höhne
11/02/2023, 3:08 PM__getitem__
marrrcin
11/02/2023, 3:09 PMGregor Höhne
11/02/2023, 3:10 PMmarrrcin
11/02/2023, 3:12 PMTypeError: h5py objects cannot be pickled
indicates that the h5py object are trying to be passed somewhere in your code (explicitly or implicitly) between multiple Python processesnum_workers>1
Gregor Höhne
11/02/2023, 3:27 PMdef _load(self) -> tuple[DataSetTorch, Dict]:
tvcnt = TemporaryVirtualContainer.from_container_paths(
container_paths=[Path(self._container_paths)],
dataset_paths=dict(
data = self._load_args["data_path"],
target = self._load_args["target_path"],
mask = self._load_args["mask_path"]
),
)
dataset = CellPatchDataSetTorch(
path=tvcnt.path,
data_path="data",
target_path="target",
mask_path="mask",
data_transform=transforms.Compose([
cft.DaskToTensor(),
cft.FillNans(),
cft.MaskImages(),
cft.EnsureChannels(num_channels=3)]),
precision=torch.float32,
)
marrrcin
11/02/2023, 3:33 PMHugo Evers
11/02/2023, 3:48 PMvLLM
and ParralelFormers
.
for now, i made a custom pipeline splicer that abuses a PartitionedDataset
to split a dataset into chunks and then run the processing of those chunks in separate nodes and then save it again. these all execute in parallel on individual AWS batch instances. so it accomplishes basically the same thing. Its probably cheaper that way too. (also because there is very high demand for the big-gpu instances on AWS).
Unfortunately, the issues im facing are quite difficult to replicate in a minimal setting, since you would need to run the pipeline on a machine with multiple GPU’s, preferably on something like AWS batch. Maybe you could provision an instance and SSH into it and run the workload inside of a devcontainer to replicate? still, it would be quite a bit of setup for an issue im not sure is solvable in kedro. Maybe some custom dataset that would enforce more control on orchestrated subprocesses? idk, im no expert on controlling processes.marrrcin
11/02/2023, 3:51 PMwe might need to find a way to pass the initialisation of the parent process for the model training parallelisation to the actual kedro node execution’s main thread.Can you expand on that?
Hugo Evers
11/02/2023, 3:55 PMGregor Höhne
11/02/2023, 3:56 PMHugo Evers
11/02/2023, 3:59 PMGregor Höhne
11/02/2023, 4:01 PMHugo Evers
11/02/2023, 4:03 PMn=3
def split_df(df:pd.DataFrame)->dict[str,pd.DataFrame]:
"""
Splits a dataframe into n equal parts
"""
return {f'df_{i}':df.iloc[i::n] for i in range(n)}
def create_splitted_pipeline(n:int)->Pipeline:
return pipeline([node(
func=split_df,
inputs={'df':'df'},
outputs={f'df_{i}':f'df_{i}' for i in range(n)},
name='split_df',
)])+pipeline([node(
func=summary,
inputs={'df':f'df_{i}'},
outputs={'summary':f'summary_{i}'},
name=f'summary_{i}') for i in range(n)
])
Gregor Höhne
11/02/2023, 4:07 PMHugo Evers
11/02/2023, 4:07 PMGregor Höhne
11/02/2023, 4:28 PMmarrrcin
11/03/2023, 7:57 AMGregor Höhne
11/05/2023, 12:45 PMmarrrcin
11/06/2023, 11:15 AM