Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink Unique Job Names #156

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hashlib
import os
import re
import secrets
import string
import time
from importlib.metadata import distributions
Expand Down Expand Up @@ -168,6 +169,21 @@ def autogenerate_job_name(self):

return job_name

def add_unique_suffix_to_flink_jobs(self, per_recipe_unique_job_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR, we should probably go through and figure out which behaviors we can expect runner implementations to sort out so that Bake can be thin and runner independent. It'd be great for it to not know anything at all about implementation details

"""FlinkOperatorBakery job names always need to be unique

to accommodate reruns and race conditions (two users running same recipe)
"""
if self.bakery_class == FlinkOperatorBakery:
unique_suffix = "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(5)
Copy link
Contributor

@moradology moradology Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that it matters, but I like random for this kind of thing so that a seed can be used for predictable outputs

)
# character length limitations for k8s is already handled downstream
# in FlinkOperatorBakery.get_pipeline_options
return self.job_name + "-" + unique_suffix
else:
return per_recipe_unique_job_name

def start(self):
"""
Start the baking process
Expand Down Expand Up @@ -252,6 +268,11 @@ def start(self):
else:
per_recipe_unique_job_name = None

# no-op here if self.bakery_class != FlinkOperatorBakery
per_recipe_unique_job_name = self.add_unique_suffix_to_flink_jobs(
per_recipe_unique_job_name
)

requirements_path = feedstock.feedstock_dir / "requirements.txt"
if requirements_path.exists():
extra_options["requirements_file"] = str(requirements_path)
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import xarray as xr
from packaging.version import parse as parse_version

from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery
from pangeo_forge_runner.bakery.local import LocalDirectBakery
from pangeo_forge_runner.commands.bake import Bake

TEST_DATA_DIR = Path(__file__).parent.parent / "test-data"
Expand Down Expand Up @@ -121,6 +123,32 @@ def recipes_version_ref(request):
)


@pytest.mark.parametrize(
("job_name", "bakery_class", "expected_job_startswith"),
(
["recipe", FlinkOperatorBakery, "recipe-"],
["recipe", LocalDirectBakery, "recipe"],
[None, LocalDirectBakery, None],
),
)
def test_add_unique_suffix_to_flink_jobs(
job_name, bakery_class, expected_job_startswith
):
bake = Bake()
bake.job_name = job_name
bake.bakery_class = bakery_class

if bakery_class == FlinkOperatorBakery:
actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name)
assert actual_job_name.startswith(expected_job_startswith)
pattern = r"^[a-zA-Z]+-[0-9a-zA-Z]{5}$"
assert bool(re.search(pattern, actual_job_name))
else:
actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name)
assert job_name == actual_job_name
assert actual_job_name == expected_job_startswith


@pytest.mark.parametrize(
("recipe_id", "expected_error", "custom_job_name", "no_input_cache"),
(
Expand Down
Loading