Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a k8s image to the pm configuration #54

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions data/process-manager-k8s-pocket.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"type": "k8s",
"name": "K8sProcessManager",
"command_address": "0.0.0.0:10054",
"image": "ghcr.io/dune-daq/alma9:latest",

"authoriser": {
"type": "dummy"
},

"broadcaster": {
"type": "kafka",
"kafka_address": "localhost:30092",
"publish_timeout": 2
}
}
3 changes: 2 additions & 1 deletion data/process-manager-k8s.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
"type": "k8s",
"name": "K8sProcessManager",
"command_address": "0.0.0.0:10054",
"image": "ghcr.io/dune-daq/alma9:latest",

"authoriser": {
"type": "dummy"
},

"broadcaster": {
"type": "kafka",
"kafka_address": "localhost:30092",
"kafka_address": "monkafka.cern.ch:30092",
"publish_timeout": 2
}
}
1 change: 1 addition & 0 deletions src/drunc/process_manager/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def _parse_dict(self, data):
new_data.type = ProcessManagerTypes.SSH
case 'k8s':
new_data.type = ProcessManagerTypes.K8s
new_data.image = data.get("image", "ghcr.io/dune-daq/alma9:latest")
case _:
from drunc.process_manager.exceptions import UnknownProcessManagerType
raise UnknownProcessManagerType(data['type'])
Expand Down
51 changes: 35 additions & 16 deletions src/drunc/process_manager/k8s_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,48 @@ def _create_namespace(self, session):
return DruncK8sNamespaceAlreadyExists (f'\"{session}\" session already exists')


def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest",env_vars=None, commands=""):
def _volume(self, name, host_path):
return self._k8s_client.V1Volume(
name=name,
host_path=self._k8s_client.V1HostPathVolumeSource(path=host_path)
)

def _volume_mount(self, name, mount_path):
return self._k8s_client.V1VolumeMount(
name=name,
mount_path=mount_path
)

def _env_vars(self, env_var):
env_vars_list = [self._k8s_client.models.V1EnvVar(name=k, value=v) for k, v in env_var.items()]
return env_vars_list

def _execs_and_args(self, executable_and_arguments=[]):
exec_and_args = []
for e_and_a in executable_and_arguments:
args = [a for a in e_and_a.args]
exec_and_args += [" ".join([e_and_a.exec] + args)]
# exec_and_args = ["source rte", "daq_application --something argument"]
exec_and_args = '; '.join(exec_and_args)
return exec_and_args

def _create_pod(self, podname, session, boot_request:BootRequest):
import os
# HACK
import socket
hostname = socket.gethostname()
#/ HACK

pod_image = self.configuration.data.image
# pod_image="ghcr.io/dune-daq/alma9:latest"

pod = self._pod_v1_api(
api_version="v1",
kind="Pod",
metadata=self._meta_v1_api(
name=podname,
namespace=session
),

spec=self._pod_spec_v1_api(
restart_policy="Never",
containers=[
Expand All @@ -158,9 +185,11 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest
image=pod_image,
command = ["sh"],
#args = ["-c", "exit 3"],
args = ["-c", commands],
args = ["-c", self._execs_and_args(boot_request.process_description.executable_and_arguments)],
#args = ["-c", "sleep 3600"],
env=env_vars,
env= self._env_vars(boot_request.process_description.env),
volume_mounts=[self._volume_mount("pwd",os.getcwd()), self._volume_mount("cvmfs","/cvmfs/")],
working_dir = boot_request.process_description.process_execution_directory,
restart_policy="Never",
security_context = self._security_context_v1_api(
run_as_user = os.getuid(),
Expand All @@ -176,6 +205,7 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest
),
)
],
volumes=[self._volume("pwd",os.getcwd()), self._volume("cvmfs","/cvmfs/")],
# HACK
affinity = self._k8s_client.V1Affinity(
self._k8s_client.V1NodeAffinity(
Expand Down Expand Up @@ -352,24 +382,13 @@ def __boot(self, boot_request:BootRequest, uuid:str) -> ProcessInstance:

session = boot_request.process_description.metadata.session
podnames = boot_request.process_description.metadata.name

if uuid in self.boot_request:
raise DruncCommandException(f'\"{session}.{podnames}\":{uuid} already exists!')
self.boot_request[uuid] = BootRequest()
self.boot_request[uuid].CopyFrom(boot_request)

env_var = boot_request.process_description.env
env_vars_list = [self._k8s_client.models.V1EnvVar(name=k, value=v) for k, v in env_var.items()]

self._create_namespace(session)
exec_and_args = []
for e_and_a in boot_request.process_description.executable_and_arguments:
args = [a for a in e_and_a.args]
exec_and_args += [" ".join([e_and_a.exec] + args)]
# exec_and_args = ["source rte", "daq_application --something argument"]
exec_and_args = '; '.join(exec_and_args)
print(exec_and_args)
self._create_pod(podnames, session, env_vars=env_vars_list, commands=exec_and_args)
self._create_pod(podnames, session, boot_request)
self._add_label(podnames, 'pod', 'uuid', uuid)
self._log.info(f'\"{session}.{podnames}\":{uuid} booted')

Expand Down