Hello Team. Does anyone have experience in deploy...
# questions
s
Hello Team. Does anyone have experience in deploying kedro pipelines to aws stepfunctions, please let me know i need some urgent help with that
d
I don't have experience with it, but just checking--are you following https://kedro.readthedocs.io/en/stable/deployment/aws_step_functions.html and running into specific issues with it? Providing some more context around your issues are will probably help people understand who can help best.
s
@Deepyaman Datta i have deployed my kedro pipeline on aws stepfuncitons but when i test it i get a os error.
{
"error": "OSError",
"cause": {
"errorMessage": "[Errno 38] Function not implemented",
"errorType": "OSError",
"requestId": "47cd9191-2a44-4811-908b-3c5f3874e396",
"stackTrace": [
"  File \"/home/app/lambda_handler.py\", line 6, in handler\n    configure_project(\"radar_ml\")\n",
"  File \"/home/app/kedro/framework/project/__init__.py\", line 243, in configure_project\n    settings.configure(settings_module)\n",
"  File \"/home/app/dynaconf/base.py\", line 182, in configure\n    self._wrapped = Settings(settings_module=settings_module, **kwargs)\n",
"  File \"/home/app/dynaconf/base.py\", line 238, in __init__\n    self.validators.validate(\n",
"  File \"/home/app/dynaconf/validator.py\", line 441, in validate\n    validator.validate(\n",
"  File \"/home/app/kedro/framework/project/__init__.py\", line 46, in validate\n    super().validate(settings, *args, **kwargs)\n",
"  File \"/home/app/dynaconf/validator.py\", line 207, in validate\n    self._validate_items(\n",
"  File \"/home/app/dynaconf/validator.py\", line 236, in _validate_items\n    self.default(settings, self)\n",
"  File \"/home/app/kedro/framework/project/__init__.py\", line 37, in validator_func\n    return getattr(importlib.import_module(module), class_name)\n",
"  File \"/usr/local/lib/python3.8/importlib/__init__.py\", line 127, in import_module\n    return _bootstrap._gcd_import(name[level:], package, level)\n",
"  File \"<frozen importlib._bootstrap>\", line 1014, in _gcd_import\n",
"  File \"<frozen importlib._bootstrap>\", line 991, in _find_and_load\n",
"  File \"<frozen importlib._bootstrap>\", line 961, in _find_and_load_unlocked\n",
"  File \"<frozen importlib._bootstrap>\", line 219, in _call_with_frames_removed\n",
"  File \"<frozen importlib._bootstrap>\", line 1014, in _gcd_import\n",
"  File \"<frozen importlib._bootstrap>\", line 991, in _find_and_load\n",
"  File \"<frozen importlib._bootstrap>\", line 975, in _find_and_load_unlocked\n",
"  File \"<frozen importlib._bootstrap>\", line 671, in _load_unlocked\n",
"  File \"<frozen importlib._bootstrap_external>\", line 843, in exec_module\n",
"  File \"<frozen importlib._bootstrap>\", line 219, in _call_with_frames_removed\n",
"  File \"/home/app/kedro/framework/session/__init__.py\", line 4, in <module>\n    from .session import KedroSession\n",
"  File \"/home/app/kedro/framework/session/session.py\", line 26, in <module>\n    from kedro.framework.session.store import BaseSessionStore\n",
"  File \"/home/app/kedro/framework/session/store.py\", line 46, in <module>\n    class ShelveStore(BaseSessionStore):\n",
"  File \"/home/app/kedro/framework/session/store.py\", line 49, in ShelveStore\n    _lock = Lock()\n",
"  File \"/usr/local/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
"  File \"/usr/local/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
"  File \"/usr/local/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
]
}
}
n
I think it’s probably these environment don’t support / disable the multiprocessing context
At the moment
ShelveStore
create a
Lock
when
store.py
get imported
We will probably move the
ShelveStore
to a separate module to prevent this from happening, but it will take some more times to the next release. @Suryansh Soni Would you be able to test it out If I push the changes to a temporary branch?
s
yes i should be @Nok Lam Chan
are available for a quick chat ?
l
Sorry for butting in, in step 2.2 in the guide it shows that you need to patch the Lock. Might want to patch the SemLock as well.
When I wrote this guide I found patching Lock was all you needed and I got the pipeline to run but looking at your exception, maybe something has changed and patching SemLock is also neccessary
K 1
s
@Lim H. then how can i add that change?
with patch("Sem.Lock")
l
Copy code
with patch("multiprocessing.Lock"), patch("multiprocessing.SemLock"):
    from kedro.framework.session import KedroSession

    with KedroSession.create(env="aws") as session:
        session.run(node_names=[node_to_run])
I haven’t tested this but something like this
based off this line in the stacktrace:
Copy code
self._semlock = _multiprocessing.SemLock(\n"
But generally I regret writing this guide. I no longer think running Kedro on StepFunction is a good idea. If you can, try AWS Batch
s
@Lim H. my team wants to deploy kedro on serverless framework that is why we are approaching this problem that way.
l
👍 on lambda you need to keep patching unimplemented python runtime features so the general approach is follow the stack trace and patch away
s
alright thank you !
i will implement this and let you know what happends
n
This is amazing @Lim H., thanks a lot
s
still getting this issue after implementing the fix.
Copy code
{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "requestId": "9adc74c9-bf3d-4bcb-a551-97fd172a809b",
  "stackTrace": [
    "  File \"/home/app/lambda_handler.py\", line 6, in handler\n    configure_project(\"radar_ml\")\n",
    "  File \"/home/app/kedro/framework/project/__init__.py\", line 243, in configure_project\n    settings.configure(settings_module)\n",
    "  File \"/home/app/dynaconf/base.py\", line 182, in configure\n    self._wrapped = Settings(settings_module=settings_module, **kwargs)\n",
    "  File \"/home/app/dynaconf/base.py\", line 238, in __init__\n    self.validators.validate(\n",
    "  File \"/home/app/dynaconf/validator.py\", line 441, in validate\n    validator.validate(\n",
    "  File \"/home/app/kedro/framework/project/__init__.py\", line 46, in validate\n    super().validate(settings, *args, **kwargs)\n",
    "  File \"/home/app/dynaconf/validator.py\", line 207, in validate\n    self._validate_items(\n",
    "  File \"/home/app/dynaconf/validator.py\", line 236, in _validate_items\n    self.default(settings, self)\n",
    "  File \"/home/app/kedro/framework/project/__init__.py\", line 37, in validator_func\n    return getattr(importlib.import_module(module), class_name)\n",
    "  File \"/usr/local/lib/python3.9/importlib/__init__.py\", line 127, in import_module\n    return _bootstrap._gcd_import(name[level:], package, level)\n",
    "  File \"<frozen importlib._bootstrap>\", line 1030, in _gcd_import\n",
    "  File \"<frozen importlib._bootstrap>\", line 1007, in _find_and_load\n",
    "  File \"<frozen importlib._bootstrap>\", line 972, in _find_and_load_unlocked\n",
    "  File \"<frozen importlib._bootstrap>\", line 228, in _call_with_frames_removed\n",
    "  File \"<frozen importlib._bootstrap>\", line 1030, in _gcd_import\n",
    "  File \"<frozen importlib._bootstrap>\", line 1007, in _find_and_load\n",
    "  File \"<frozen importlib._bootstrap>\", line 986, in _find_and_load_unlocked\n",
    "  File \"<frozen importlib._bootstrap>\", line 680, in _load_unlocked\n",
    "  File \"<frozen importlib._bootstrap_external>\", line 850, in exec_module\n",
    "  File \"<frozen importlib._bootstrap>\", line 228, in _call_with_frames_removed\n",
    "  File \"/home/app/kedro/framework/session/__init__.py\", line 4, in <module>\n    from .session import KedroSession\n",
    "  File \"/home/app/kedro/framework/session/session.py\", line 26, in <module>\n    from kedro.framework.session.store import BaseSessionStore\n",
    "  File \"/home/app/kedro/framework/session/store.py\", line 46, in <module>\n    class ShelveStore(BaseSessionStore):\n",
    "  File \"/home/app/kedro/framework/session/store.py\", line 49, in ShelveStore\n    _lock = Lock()\n",
    "  File \"/usr/local/lib/python3.9/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/usr/local/lib/python3.9/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/usr/local/lib/python3.9/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}
d
I'd guess the patch isn't getting applied. Maybe you need to patch
_multiprocessing.SemLock
?
s
can you show me a code snippet on how to do that.
d
patch("_multiprocessing.SemLock")
instead of
patch("multiprocessing.SemLock")
? I'm still not 100% sure this will work, since patching isn't that straightforward and you need to patch it the way it gets imported, but it looks right to me based on how it's being called in
multiprocessing/synchronize.py
. I also didn't know you can do multiple patches in a
with
statement, TIL.
s
@Deepyaman Datta still causing the same issue
does work.
there is some major disconnect
n
Does the patching solution work at the end? If so would you mind sharing what works? @Suryansh Soni
s
Hello @Nok Lam Chan that patch solution didnt work. still got the same issue. looks like with patch("_multiprocessing.SemLock") is trying to override the method that doesnt exist.
There should be a different way to handle this instead of with patch("multiprocessing")