Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions task_processing/plugins/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,16 @@ def get_capabilities_for_capability_changes(
caps = {
capability_type: capabilities
for (capability_type, capabilities) in [
("add", list(cap_add)),
("drop", list(cap_drop)),
# NOTE: these don't actually need to be sorted since the order of caps here won't
# cause bounces or anything - but in case someone is inspired by this, it'll be
# good to do the paranoid thing and save them the trouble of debugging what we ran
# into in Yelp/paasta#3973
("add", sorted(list(cap_add))),
# NOTE: this is necessary as containerd differs in behavior from dockershim: with
# dockershim dropped capabilities were overriden if the same capability was added - but
# in containerd the dropped capabilities appear to have higher priority.
# Related: Yelp/paasta#3972 and Yelp/paasta#3973
("drop", sorted(list(set(cap_drop) - set(cap_add)))),
]
if capabilities
}
Expand Down
133 changes: 127 additions & 6 deletions tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from task_processing.plugins.kubernetes.kubernetes_pod_executor import (
KubernetesTaskState,
)
from task_processing.plugins.kubernetes.task_config import DEFAULT_CAPS_DROP
from task_processing.plugins.kubernetes.task_config import KubernetesTaskConfig
from task_processing.plugins.kubernetes.types import PodEvent

Expand Down Expand Up @@ -171,7 +172,7 @@ def test_run_single_request_memory(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -275,7 +276,7 @@ def test_run_single_request_cpu(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -378,7 +379,7 @@ def test_run_both_requests(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -482,7 +483,7 @@ def test_run_no_requests(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -624,7 +625,7 @@ def test_run_authentication_token(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -745,7 +746,7 @@ def test_run_topology_spread_constraint(mock_get_node_affinity, k8s_executor):
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
capabilities=V1Capabilities(drop=sorted(list(task_config.cap_drop))),
),
resources=V1ResourceRequirements(
limits={
Expand Down Expand Up @@ -804,6 +805,126 @@ def test_run_topology_spread_constraint(mock_get_node_affinity, k8s_executor):
]


@mock.patch(
"task_processing.plugins.kubernetes.kubernetes_pod_executor.get_node_affinity",
autospec=True,
)
@pytest.mark.parametrize(
"caps_add, caps_drop, expected_add, expected_drop",
(
([], DEFAULT_CAPS_DROP, None, sorted(list(DEFAULT_CAPS_DROP))),
(DEFAULT_CAPS_DROP, DEFAULT_CAPS_DROP, sorted(list(DEFAULT_CAPS_DROP)), None),
(
["NET_RAW"],
DEFAULT_CAPS_DROP - {"NET_RAW"},
["NET_RAW"],
sorted(list(DEFAULT_CAPS_DROP - {"NET_RAW"})),
),
),
)
def test_run_duplicate_caps(
mock_get_node_affinity,
k8s_executor,
caps_add,
caps_drop,
expected_add,
expected_drop,
):
task_config = KubernetesTaskConfig(
name="fake_task_name",
uuid="fake_id",
image="fake_docker_image",
command="fake_command",
cpus=1,
memory=1024,
disk=1024,
volumes=[{"host_path": "/a", "container_path": "/b", "mode": "RO"}],
node_selectors={"hello": "world"},
node_affinities=[dict(key="a_label", operator="In", value=[])],
labels={
"some_label": "some_label_value",
},
annotations={
"paasta.yelp.com/some_annotation": "some_value",
},
service_account_name="testsa",
ports=[8888],
stdin=True,
stdin_once=True,
tty=True,
cap_add=caps_add,
cap_drop=caps_drop,
)
expected_container = V1Container(
image=task_config.image,
name="main",
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(
add=expected_add,
drop=expected_drop,
),
),
resources=V1ResourceRequirements(
limits={
"cpu": 1.0,
"memory": "1024.0Mi",
"ephemeral-storage": "1024.0Mi",
},
requests=None,
),
env=[],
volume_mounts=[
V1VolumeMount(
mount_path="/b",
name="host--slash-a",
read_only=True,
)
],
ports=[V1ContainerPort(container_port=8888)],
stdin=True,
stdin_once=True,
tty=True,
)
expected_pod = V1Pod(
metadata=V1ObjectMeta(
name=task_config.pod_name,
namespace="task_processing_tests",
labels={
"some_label": "some_label_value",
},
annotations={
"paasta.yelp.com/some_annotation": "some_value",
},
),
spec=V1PodSpec(
restart_policy=task_config.restart_policy,
containers=[expected_container],
volumes=[
V1Volume(
host_path=V1HostPathVolumeSource(path="/a"),
name="host--slash-a",
)
],
share_process_namespace=True,
security_context=V1PodSecurityContext(
fs_group=task_config.fs_group,
),
node_selector={"hello": "world"},
affinity=V1Affinity(node_affinity=mock_get_node_affinity.return_value),
topology_spread_constraints=[],
dns_policy="Default",
service_account_name=task_config.service_account_name,
),
)

assert k8s_executor.run(task_config) == task_config.pod_name
assert k8s_executor.kube_client.core.create_namespaced_pod.call_args_list == [
mock.call(body=expected_pod, namespace="task_processing_tests")
]


def test_process_event_enqueues_task_processing_events_pending_to_running(k8s_executor):
mock_pod = mock.Mock(spec=V1Pod)
mock_pod.metadata.name = "test.1234"
Expand Down
1 change: 1 addition & 0 deletions tests/unit/plugins/kubernetes/kubernetes_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
(
(v(), v(), None),
(v("AUDIT_READ"), v(), V1Capabilities(add=["AUDIT_READ"])),
(v("AUDIT_READ"), v("AUDIT_READ"), V1Capabilities(add=["AUDIT_READ"])),
(v(), v("AUDIT_READ"), V1Capabilities(drop=["AUDIT_READ"])),
(
v("AUDIT_WRITE"),
Expand Down