Hi all, I'm currently struggling with training mul...
# questions
g
Hi all, I'm currently struggling with training multiple modular pipelines simultaneously using a Pytorch lightning trainer. My modular pipeline looks like this: 1. creates a
trainer
, 2. creates the lightning
data module
with the data loaders, 3. creates the
model
lightning module, 4.
train
the model using the trainer, the data module and the model module Following num_workers is the variable used in the Pytorch DataLoader for parallelization of data loading: When using the Sequential Runner and
num_workers=0
, every pipeline is executed as expected. When using the Thread Runner and
num_workers=0
, also everything works as expected. When using
num_workers > 1
and the Thread Runner the trainings get stuck in the sanity check of the lightning model module. When hard coding a dummy_ds which is passed between each modular pipeline, running with
num_workers > 1
works (at least sometimes). Has anyone experience with the Pytorch lightning module and could help me please to get the most out of the modular pipeline framework?
j
hi @Gregor Höhne, if you're parallelising at the DataLoader level already, did you try using the sequential runner with
num_workers > 1
? wondering if there's something clashing between Kedro threads and PyTorch ones
g
Hi @Juan Luis, thanks for the fast reply. Using the SequentialRunner with
num_worker > 0
results in being stuck in the sanity check - therefore not being able to load the batches at all.
That's why I'm using the ThreadRunner. The Parallel Runner does not work for me as I define one MemoryDataset myself with:
Copy code
"{dataset_name}":
    type: MemoryDataSet
    copy_mode: assign
What might be also interesting to know, when running only one pipeline with ThreadRunner only sometimes the lighting trainer starts training, sometimes it also gets stuck
j
this is somewhat similar, but not quite, to a problem @Hugo Evers was experiencing https://kedro-org.slack.com/archives/C03RKP2LW64/p1698049453415239 we'd like to have a closer look @Gregor Höhne, would you be able to provide a minimal reproducer and open an issue on GitHub about it? at least these two cases: • sequential runner, num_worker > 0 consistently gets stuck • thread runner, num_workers > 1, hardcoded ds, sometimes gets stuck
g
@marrrcin I think this might be exactly why you guys developed kedro-azureml. Could you give some insights how to proceed/ make it work if someone does not work in an Azure environment?
m
As far as I remember it has sth to do with the forking - since
num_workers>1
will basically do a fork of a process, it might be problematic. Try the following first: 1. Run nodes up to the training node, so that all datasets required by training node are materialized (saved via Data Catalog) 2. Run only training node 3. Run nodes after training node
And let’s see what happens after that.
g
Similar to this tutorial, I put all data and model creation into the training node but that did not resolve the issue. The trainer is still stuck in the sanity check. Still using the ThreadRunner and num_workers > 1 (num_workers=0 still works)
m
But how do you run it?
Do you use
kedro run
(meaning - whole pipeline) or single nodes?
g
Kedro run --pipeline as it only consists out of the training node
image.png
m
Copy code
from torch import multiprocessing
def set_multiprocessing_method():
    if multiprocessing.get_start_method() != "spawn":
        multiprocessing.set_start_method(
            "spawn", force=True
        )  # Only SPAWN works with Cuda
Invoke the above as soon as possible in the node, maybe it will help
g
This error results when using that function:
Copy code
TypeError: h5py objects cannot be pickled
The h5py objects are used in the data module. To overcome this I used copy_mode: assign in the MemoryDataset definition in the catalog file. The h5py object seem to be often a problem. Do you have any idea how to overcome this problem?
The input of the pipeline is a custom AbstractDataset which contains these objects
m
I guess that previously it was hanging silently on the h5py then. How do you load those hdf5 files? In the
__getitem__
or in the
__init__
?
g
I load these files with
__getitem__
m
And what do you return from getitem? Don’t return hd5py objects, return torch tensors and it should work then
g
I'm returning a tuple of torch.Tensors:
So input and labels in torch.Tensor format
m
So you dont load the hdf5 files in the getitem then 🤔
This error:
Copy code
TypeError: h5py objects cannot be pickled
indicates that the h5py object are trying to be passed somewhere in your code (explicitly or implicitly) between multiple Python processes
Make sure that your custom dataset does not have reference to the hd5py objects before you start using
num_workers>1
g
Thanks for those thoughts (that might be the actual problem). Unfortunately, I'm not quite sure how to resolve that issue. I defined a custom AbstractDataset which does following:
Copy code
def _load(self) -> tuple[DataSetTorch, Dict]:
   tvcnt = TemporaryVirtualContainer.from_container_paths(
      container_paths=[Path(self._container_paths)],
      dataset_paths=dict(
          data = self._load_args["data_path"],
          target = self._load_args["target_path"],
          mask = self._load_args["mask_path"]
      ),
   )
   dataset = CellPatchDataSetTorch(
       path=tvcnt.path,
       data_path="data",
       target_path="target",
       mask_path="mask",
       data_transform=transforms.Compose([
            cft.DaskToTensor(), 
            cft.FillNans(), 
            cft.MaskImages(), 
            cft.EnsureChannels(num_channels=3)]),
       precision=torch.float32,
   )
So the CellPatchDataSetTorch has a reference to the TemporaryVirtualContainer which is the h5 object.
When using this notation in a .py file it works. But as it is a kedro input and defined in the catalog this might be the actual problem as it gets passed into the pipeline, right?
m
I’m not sure I get what you mean
h
Hi guys, wrt the similarity to the issue im facing: idk whether we can solve this in kedro actually. we might need to find a way to pass the initialisation of the parent process for the model training parallelisation to the actual kedro node execution’s main thread. For some reason there is no issue when using ray to do hyperopt, but there are issues when running
vLLM
and
ParralelFormers
. for now, i made a custom pipeline splicer that abuses a
PartitionedDataset
to split a dataset into chunks and then run the processing of those chunks in separate nodes and then save it again. these all execute in parallel on individual AWS batch instances. so it accomplishes basically the same thing. Its probably cheaper that way too. (also because there is very high demand for the big-gpu instances on AWS). Unfortunately, the issues im facing are quite difficult to replicate in a minimal setting, since you would need to run the pipeline on a machine with multiple GPU’s, preferably on something like AWS batch. Maybe you could provision an instance and SSH into it and run the workload inside of a devcontainer to replicate? still, it would be quite a bit of setup for an issue im not sure is solvable in kedro. Maybe some custom dataset that would enforce more control on orchestrated subprocesses? idk, im no expert on controlling processes.
m
we might need to find a way to pass the initialisation of the parent process for the model training parallelisation to the actual kedro node execution’s main thread.
Can you expand on that?
h
the issue im getting (according to the parralelformers repo) has to do with the fact that the parralelisation is not happening in the main-thead. so they recommend you to run it using if _ _name _ ==“ _ _main _ __”
but this could be a completely separate issue from gregors
if so, dont let me distract you 😛
g
Hi Hugo, could you please share your custom pipeline splicer, so I could have a more detailed look and see if we have a similar issue
Or some pseudo code would also be fine 🙂
h
so i use a runner for AWS batch, so these are all separate nodes and run in parralel because of AWS batch, every node run as a separate docker command, so for the kedro command running inside of the container its not parralel
what im saying is, this will only help you if you’re using aws batch
g
I see. I don't use AWS batch.
@marrrcin I will have another detailed look into my problem and will come back to you, if that's okay 🙂
👍 1
h
Copy code
n=3

def split_df(df:pd.DataFrame)->dict[str,pd.DataFrame]:
    """
    Splits a dataframe into n equal parts
    """
    return {f'df_{i}':df.iloc[i::n] for i in range(n)}

def create_splitted_pipeline(n:int)->Pipeline:
    return pipeline([node(
    func=split_df,
    inputs={'df':'df'},
    outputs={f'df_{i}':f'df_{i}' for i in range(n)},
    name='split_df',
    )])+pipeline([node(
    func=summary,
    inputs={'df':f'df_{i}'},
    outputs={'summary':f'summary_{i}'},
    name=f'summary_{i}') for i in range(n)
    ])
something like that
really ugly i know..
g
Uff, that is indeed an ugly work around 😉 but I think we have different problems - mine is probably strongly related on my data which is stored in an h5 container and when trying to paralyze it, problems occur.
h
could have to do with the process controller
you start that process in a dataset, and then execute in a node, which is a subprocess of the kedro main thread right?
but im saying that Unhindered by any knowledge of how these processes work
g
Yes, exactly.
@marrrcin When loading the data not through my custom dataset (which is used in the catalog), but loading it directly in the training node everything works fine. So the problem I'm encountering is that the loaded h5 container is passed between getting loaded as input and then passed into the pipeline. Any idea what might cause this problem and how to bypass it?
m
That’s interesting 🤔 In the setup where you load dataset in node, do you run with SequentialRunner or ThreadRunner?
g
Hi @marrrcin, after many hours of trying out stuff and debugging I found the bottleneck (had nothing to do with Kedro itself 😉). The problem was the hdf5 file used in the Pytorch dataset and how it is opened in a parallel manner. This issue explains really good what the problem is: https://github.com/pytorch/pytorch/issues/11929. Your hint about where and how the hdf5 file is opened was therefore pretty good 🎉. Kedro seems to run the pipelines differently then python runs some_file.py, so the issue only appeared when running the kedro pipelines. Thank you so much for your help!
🎉 1
m
Glad I helped to guide the debugging 🙂
137 Views