Hey everyone, Im building tests for a kedro pyspar...
# questions
b
Hey everyone, Im building tests for a kedro pyspark pipeline and I would like to pass a specific spark configuration needed to pass the tests. I have tried various things but nothing works. Which is the best way to pass the spark configurations typically found in spark.yml into tests? Thank you in advance!
d
I don't think there's anything built-in Kedro-wise, but you can use a (session-scoped) pytest fixture and basically do the configuration there.
b
This is what I have included in my conftest.py however im still getting a Java Heap Space error (which is because it is still using the default driver memory instead of the one I defined in the spark.yml file). What am i missing?
Copy code
@pytest.fixture
def config_loader():
    return ConfigLoader(conf_source=conf_path)

@pytest.fixture
def project_context(config_loader):
    return KedroContext(
        package_name="cnx",
        project_path=Path.cwd(),
        config_loader=config_loader,
        hook_manager=_create_hook_manager(),
    )

@pytest.fixture
@hook_impl
def after_context_created(project_context) -> None:
    """Initialises a SparkSession using the config
    defined in project's conf folder.
    """

    # Load the spark configuration in spark.yaml using the config loader
    parameters = project_context.config_loader.get("spark*", "spark*/**")
    spark_conf = SparkConf().setAll(parameters.items())

    # Initialise the spark session
    spark_session_conf = (
        #SparkSession.builder.appName(context._package_name)
        SparkSession.builder.appName(project_context.project_path.name)
        .enableHiveSupport()
        .config(conf=spark_conf)
    )
    _spark_session = spark_session_conf.getOrCreate()
    _spark_session.sparkContext.setLogLevel("WARN")
It seems like its simply ignoring the hook I created
d
I'm a bit confused, are you trying to register a Kedro hook for auto-use by
pytest
?
@hook_impl
isn't going to do anything here (unless I suppose you're doing something more advanced that I'm not familiar with using pytest's plugin framework, since that is technically what Kedro uses for hooks). You don't have your fixtures for auto-use here, so are you passing the fixture to each function? In short, your fixture should probably work something like https://www.mikulskibartosz.name/use-one-spark-session-to-run-all-pytest-tests/, but you can load the Kedro config here instead if you'd like when constructing the session.
b
I am passing my fixtures to each function. Lets say I create a spark_session as done in the link you shared. How would I then pass the spark_session created in the fixture to my test which runs my pipeline start-to-finish?
n
Are you testing the pipeline or testing a function that requires spark session?
b
Both, but for this particular case I want to test the entire pipeline.
n
In that case you want to run a pipeline with the Python API, are you using KedroSession to run? It should load the configuration already.
b
Not sure im using a KedroSession. Would you be able to connect and help me out @Nok Lam Chan?
Nevermind, finally was able to solve the issue!!
Thank you!
r
Hey, @Bernardo Branco, it might be helpful to give a quick snippet of how you solved the problem for posterity
n
There isn’t anything specific about test, it should be identical to how you run a Kedro pipeline with the Python API. When you are using Kedro IPython/Jupyter, session is pre-loaded for you. If you need to create manually, the docstring provides an example. The additional bit you need is the
bootstrap_project
call.
Copy code
bootstrap_project(Path("<project_root>"))
with KedroSession.create() as session:
    session.run(<some_pipeline>)
https://docs.kedro.org/en/stable/kedro.framework.session.session.KedroSession.html#kedro-framework-session-session-kedrosession