From 32a1fdd2e1b2ff9c11c7466a856d013e4cd7c988 Mon Sep 17 00:00:00 2001 From: tta20 Date: Wed, 8 May 2024 17:35:38 +0200 Subject: [PATCH 1/6] changes to process-manager-confs --- data/process-manager-k8s-pocket.json | 15 +++++++++++++++ data/process-manager-k8s.json | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 data/process-manager-k8s-pocket.json diff --git a/data/process-manager-k8s-pocket.json b/data/process-manager-k8s-pocket.json new file mode 100644 index 00000000..2518e682 --- /dev/null +++ b/data/process-manager-k8s-pocket.json @@ -0,0 +1,15 @@ +{ + "type": "k8s", + "name": "K8sProcessManager", + "command_address": "0.0.0.0:10054", + + "authoriser": { + "type": "dummy" + }, + + "broadcaster": { + "type": "kafka", + "kafka_address": "localhost:30092", + "publish_timeout": 2 + } +} diff --git a/data/process-manager-k8s.json b/data/process-manager-k8s.json index 2518e682..2ea09af4 100644 --- a/data/process-manager-k8s.json +++ b/data/process-manager-k8s.json @@ -9,7 +9,7 @@ "broadcaster": { "type": "kafka", - "kafka_address": "localhost:30092", + "kafka_address": "monkafka.cern.ch:30092", "publish_timeout": 2 } } From 22e62d3b47a29a6ead022e1932936af67b0edb51 Mon Sep 17 00:00:00 2001 From: tta20 Date: Wed, 8 May 2024 17:36:26 +0200 Subject: [PATCH 2/6] initial volume mounting --- .../process_manager/k8s_process_manager.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index ab2bd8d2..2b9fe3e9 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -141,6 +141,23 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest import socket hostname = socket.gethostname() #/ HACK + current_directory = os.getcwd() + volume_pwd = self._k8s_client.V1Volume( + name="pwd", + host_path=self._k8s_client.V1HostPathVolumeSource(path=current_directory) + ) + volume_mount_pwd = self._k8s_client.V1VolumeMount( + name="pwd", + mount_path="/nfs/home/titavare/dunedaq_work_area/drunc-v5.0.0" + ) + volume_cvmfs = self._k8s_client.V1Volume( + name="cvmfs", + host_path=self._k8s_client.V1HostPathVolumeSource(path="/cvmfs/") + ) + volume_mount_cvmfs = self._k8s_client.V1VolumeMount( + name="cvmfs", + mount_path="/cvmfs/" + ) pod = self._pod_v1_api( api_version="v1", @@ -161,6 +178,7 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest args = ["-c", commands], #args = ["-c", "sleep 3600"], env=env_vars, + volume_mounts=[volume_mount_pwd, volume_mount_cvmfs], restart_policy="Never", security_context = self._security_context_v1_api( run_as_user = os.getuid(), @@ -176,6 +194,7 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest ), ) ], + volumes=[volume_pwd, volume_cvmfs], # HACK affinity = self._k8s_client.V1Affinity( self._k8s_client.V1NodeAffinity( @@ -311,12 +330,12 @@ def _return_code(self, podname, session): pods = self._core_v1_api.list_namespaced_pod(session) pod_names = [pod.metadata.name for pod in pods.items] if not podname in pod_names: - return_code = None + return_code = 0 else: if not self.is_alive(podname, session): return_code = self._core_v1_api.read_namespaced_pod_status(podname, session).status.container_statuses[0].state.terminated.exit_code else: - return_code = None + return_code = 0 return return_code @@ -352,7 +371,6 @@ def __boot(self, boot_request:BootRequest, uuid:str) -> ProcessInstance: session = boot_request.process_description.metadata.session podnames = boot_request.process_description.metadata.name - if uuid in self.boot_request: raise DruncCommandException(f'\"{session}.{podnames}\":{uuid} already exists!') self.boot_request[uuid] = BootRequest() From e4dde03d6019e844fe248380c340a5100afd6789 Mon Sep 17 00:00:00 2001 From: tta20 Date: Thu, 9 May 2024 16:41:21 +0200 Subject: [PATCH 3/6] added volume mounting in pods and cleaned up env_vars and exec_and_args --- .../process_manager/k8s_process_manager.py | 69 ++++++++++--------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 2b9fe3e9..a0e4540f 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -135,29 +135,40 @@ def _create_namespace(self, session): return DruncK8sNamespaceAlreadyExists (f'\"{session}\" session already exists') - def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest",env_vars=None, commands=""): + def _volume(self, name, host_path): + return self._k8s_client.V1Volume( + name=name, + host_path=self._k8s_client.V1HostPathVolumeSource(path=host_path) + ) + + def _volume_mount(self, name, mount_path): + return self._k8s_client.V1VolumeMount( + name=name, + mount_path=mount_path + ) + + def _env_vars(self, env_var): + env_vars_list = [self._k8s_client.models.V1EnvVar(name=k, value=v) for k, v in env_var.items()] + return env_vars_list + + def _execs_and_args(self, executable_and_arguments=[]): + exec_and_args = [] + for e_and_a in executable_and_arguments: + args = [a for a in e_and_a.args] + exec_and_args += [" ".join([e_and_a.exec] + args)] + # exec_and_args = ["source rte", "daq_application --something argument"] + exec_and_args = '; '.join(exec_and_args) + return exec_and_args + + def _create_pod(self, podname, session, boot_request:BootRequest): import os # HACK import socket hostname = socket.gethostname() #/ HACK - current_directory = os.getcwd() - volume_pwd = self._k8s_client.V1Volume( - name="pwd", - host_path=self._k8s_client.V1HostPathVolumeSource(path=current_directory) - ) - volume_mount_pwd = self._k8s_client.V1VolumeMount( - name="pwd", - mount_path="/nfs/home/titavare/dunedaq_work_area/drunc-v5.0.0" - ) - volume_cvmfs = self._k8s_client.V1Volume( - name="cvmfs", - host_path=self._k8s_client.V1HostPathVolumeSource(path="/cvmfs/") - ) - volume_mount_cvmfs = self._k8s_client.V1VolumeMount( - name="cvmfs", - mount_path="/cvmfs/" - ) + + # pod_image = boot_request.{where_pod_image_is} + pod_image="ghcr.io/dune-daq/alma9:latest" pod = self._pod_v1_api( api_version="v1", @@ -166,7 +177,6 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest name=podname, namespace=session ), - spec=self._pod_spec_v1_api( restart_policy="Never", containers=[ @@ -175,10 +185,11 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest image=pod_image, command = ["sh"], #args = ["-c", "exit 3"], - args = ["-c", commands], + args = ["-c", self._execs_and_args(boot_request.process_description.executable_and_arguments)], #args = ["-c", "sleep 3600"], - env=env_vars, - volume_mounts=[volume_mount_pwd, volume_mount_cvmfs], + env= self._env_vars(boot_request.process_description.env), + volume_mounts=[self._volume_mount("pwd",os.getcwd()), self._volume_mount("cvmfs","/cvmfs/")], + working_dir = boot_request.process_description.process_execution_directory, restart_policy="Never", security_context = self._security_context_v1_api( run_as_user = os.getuid(), @@ -194,7 +205,7 @@ def _create_pod(self, podname, session, pod_image="ghcr.io/dune-daq/alma9:latest ), ) ], - volumes=[volume_pwd, volume_cvmfs], + volumes=[self._volume("pwd",os.getcwd()), self._volume("cvmfs","/cvmfs/")], # HACK affinity = self._k8s_client.V1Affinity( self._k8s_client.V1NodeAffinity( @@ -376,18 +387,8 @@ def __boot(self, boot_request:BootRequest, uuid:str) -> ProcessInstance: self.boot_request[uuid] = BootRequest() self.boot_request[uuid].CopyFrom(boot_request) - env_var = boot_request.process_description.env - env_vars_list = [self._k8s_client.models.V1EnvVar(name=k, value=v) for k, v in env_var.items()] - self._create_namespace(session) - exec_and_args = [] - for e_and_a in boot_request.process_description.executable_and_arguments: - args = [a for a in e_and_a.args] - exec_and_args += [" ".join([e_and_a.exec] + args)] - # exec_and_args = ["source rte", "daq_application --something argument"] - exec_and_args = '; '.join(exec_and_args) - print(exec_and_args) - self._create_pod(podnames, session, env_vars=env_vars_list, commands=exec_and_args) + self._create_pod(podnames, session, boot_request) self._add_label(podnames, 'pod', 'uuid', uuid) self._log.info(f'\"{session}.{podnames}\":{uuid} booted') From 22d607f444598d7448f8fb49ef52ee7f38c74c79 Mon Sep 17 00:00:00 2001 From: tta20 Date: Thu, 9 May 2024 16:47:06 +0200 Subject: [PATCH 4/6] changed return_code back to None --- src/drunc/process_manager/k8s_process_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index a0e4540f..2b2bfd8b 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -341,12 +341,12 @@ def _return_code(self, podname, session): pods = self._core_v1_api.list_namespaced_pod(session) pod_names = [pod.metadata.name for pod in pods.items] if not podname in pod_names: - return_code = 0 + return_code = None else: if not self.is_alive(podname, session): return_code = self._core_v1_api.read_namespaced_pod_status(podname, session).status.container_statuses[0].state.terminated.exit_code else: - return_code = 0 + return_code = None return return_code From 72eb02ede94b0a161ec06e76128a60d2d97a7000 Mon Sep 17 00:00:00 2001 From: tta20 Date: Thu, 9 May 2024 17:30:39 +0200 Subject: [PATCH 5/6] added image to the k8s-pm-configuration --- data/process-manager-k8s-pocket.json | 1 + data/process-manager-k8s.json | 1 + src/drunc/process_manager/configuration.py | 1 + src/drunc/process_manager/k8s_process_manager.py | 4 ++-- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/data/process-manager-k8s-pocket.json b/data/process-manager-k8s-pocket.json index 2518e682..72d1eeae 100644 --- a/data/process-manager-k8s-pocket.json +++ b/data/process-manager-k8s-pocket.json @@ -2,6 +2,7 @@ "type": "k8s", "name": "K8sProcessManager", "command_address": "0.0.0.0:10054", + "image": "ghcr.io/dune-daq/alma9:latest", "authoriser": { "type": "dummy" diff --git a/data/process-manager-k8s.json b/data/process-manager-k8s.json index 2ea09af4..b68e47c7 100644 --- a/data/process-manager-k8s.json +++ b/data/process-manager-k8s.json @@ -2,6 +2,7 @@ "type": "k8s", "name": "K8sProcessManager", "command_address": "0.0.0.0:10054", + "image": "ghcr.io/dune-daq/alma9:latest", "authoriser": { "type": "dummy" diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index addc4053..8d419767 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -28,6 +28,7 @@ def _parse_dict(self, data): new_data.type = ProcessManagerTypes.SSH case 'k8s': new_data.type = ProcessManagerTypes.K8s + new_data.image = data.get("image", 2) case _: from drunc.process_manager.exceptions import UnknownProcessManagerType raise UnknownProcessManagerType(data['type']) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 2b2bfd8b..0a64de9e 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -167,8 +167,8 @@ def _create_pod(self, podname, session, boot_request:BootRequest): hostname = socket.gethostname() #/ HACK - # pod_image = boot_request.{where_pod_image_is} - pod_image="ghcr.io/dune-daq/alma9:latest" + pod_image = self.configuration.data.image + # pod_image="ghcr.io/dune-daq/alma9:latest" pod = self._pod_v1_api( api_version="v1", From f633cf4434f7409bbc2b491eabe7b24faefd8421 Mon Sep 17 00:00:00 2001 From: tta20 Date: Thu, 9 May 2024 17:44:44 +0200 Subject: [PATCH 6/6] changed default to ghcr.io/dune-daq/alma9:latest --- src/drunc/process_manager/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index 8d419767..55a13448 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -28,7 +28,7 @@ def _parse_dict(self, data): new_data.type = ProcessManagerTypes.SSH case 'k8s': new_data.type = ProcessManagerTypes.K8s - new_data.image = data.get("image", 2) + new_data.image = data.get("image", "ghcr.io/dune-daq/alma9:latest") case _: from drunc.process_manager.exceptions import UnknownProcessManagerType raise UnknownProcessManagerType(data['type'])