diff --git a/Dockerfile b/Dockerfile index 26fb6345..7ed8cb61 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.20 as builder +FROM golang:1.20 AS builder WORKDIR /workspace diff --git a/api/v1alpha2/minicluster_types.go b/api/v1alpha2/minicluster_types.go index 7afafcab..93752002 100644 --- a/api/v1alpha2/minicluster_types.go +++ b/api/v1alpha2/minicluster_types.go @@ -50,6 +50,11 @@ type MiniClusterSpec struct { // +optional Interactive bool `json:"interactive"` + // Allow >1 Flux running (oversubscribing resources) + // +kubebuilder:default=false + // +optional + Oversubscribe bool `json:"oversubscribe"` + // Flux options for the broker, shared across cluster // +optional Flux FluxSpec `json:"flux"` @@ -825,13 +830,6 @@ func (f *MiniCluster) Validate() bool { // Count the FluxRunners if container.RunFlux { fluxRunners += 1 - - // Non flux-runners are required to have a name - } else { - if container.Name == "" { - fmt.Printf("๐Ÿ˜ฅ๏ธ %s is missing a name\n", name) - return false - } } // If a custom script is provided AND a command, no go @@ -840,7 +838,16 @@ func (f *MiniCluster) Validate() bool { return false } } - if fluxRunners != 1 { + + // If we have more than one flux runner, must explicitly oversubscribe + if fluxRunners > 1 && !f.Spec.Oversubscribe { + fmt.Printf("๐Ÿ˜ฅ๏ธ More than one flux runner requires oversubscribe: true\n") + valid = false + } + + // More than one container can run Flux (and the brokers see the same resources) + // But we need at least one! + if fluxRunners < 1 { valid = false } diff --git a/api/v1alpha2/swagger.json b/api/v1alpha2/swagger.json index 424ab982..9d14db62 100644 --- a/api/v1alpha2/swagger.json +++ b/api/v1alpha2/swagger.json @@ -626,6 +626,11 @@ "default": {}, "$ref": "#/definitions/Network" }, + "oversubscribe": { + "description": "Allow \u003e1 Flux running (oversubscribing resources)", + "type": "boolean", + "default": false + }, "pod": { "description": "Pod spec details", "default": {}, diff --git a/api/v1alpha2/zz_generated.openapi.go b/api/v1alpha2/zz_generated.openapi.go index 0b1792a1..a7069395 100644 --- a/api/v1alpha2/zz_generated.openapi.go +++ b/api/v1alpha2/zz_generated.openapi.go @@ -1084,6 +1084,14 @@ func schema_flux_framework_flux_operator_api_v1alpha2_MiniClusterSpec(ref common Format: "", }, }, + "oversubscribe": { + SchemaProps: spec.SchemaProps{ + Description: "Allow >1 Flux running (oversubscribing resources)", + Default: false, + Type: []string{"boolean"}, + Format: "", + }, + }, "flux": { SchemaProps: spec.SchemaProps{ Description: "Flux options for the broker, shared across cluster", diff --git a/chart/templates/minicluster-crd.yaml b/chart/templates/minicluster-crd.yaml index f8f22982..683677be 100644 --- a/chart/templates/minicluster-crd.yaml +++ b/chart/templates/minicluster-crd.yaml @@ -523,6 +523,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/config/crd/bases/flux-framework.org_miniclusters.yaml b/config/crd/bases/flux-framework.org_miniclusters.yaml index d6190f39..3ee9fa96 100644 --- a/config/crd/bases/flux-framework.org_miniclusters.yaml +++ b/config/crd/bases/flux-framework.org_miniclusters.yaml @@ -526,6 +526,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/controllers/flux/containers.go b/controllers/flux/containers.go index 7d8051d3..21420781 100644 --- a/controllers/flux/containers.go +++ b/controllers/flux/containers.go @@ -53,7 +53,7 @@ func getFluxContainer( func getContainers( specs []api.MiniClusterContainer, - defaultName string, + customName string, mounts []corev1.VolumeMount, serviceContainer bool, ) ([]corev1.Container, error) { @@ -70,8 +70,15 @@ func getContainers( pullPolicy = corev1.PullAlways } - // Fluxrunner will use the namespace name - containerName := container.Name + // Give all flux containers a name, if not provided + if container.Name == "" { + // Maintain previous behavior to have name == main flux runner + if i == 0 { + container.Name = customName + } else { + container.Name = fmt.Sprintf("%s-%d", container.Name, i) + } + } command := []string{} // A Flux runner will have a wait.sh script that waits for the flux view @@ -82,7 +89,6 @@ func getContainers( // wait.sh path corresponds to container identifier waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i) command = []string{"/bin/bash", waitScript} - containerName = defaultName } // A container not running flux can only have pre/post sections @@ -140,7 +146,7 @@ func getContainers( newContainer := corev1.Container{ // Call this the driver container, number 0 - Name: containerName, + Name: container.Name, Image: container.Image, ImagePullPolicy: pullPolicy, WorkingDir: container.WorkingDir, diff --git a/controllers/flux/job.go b/controllers/flux/job.go index ee79eeb0..65d60bdc 100644 --- a/controllers/flux/job.go +++ b/controllers/flux/job.go @@ -98,6 +98,8 @@ func NewMiniClusterJob(cluster *api.MiniCluster) (*batchv1.Job, error) { } // Prepare listing of containers for the MiniCluster + // We don't provide a default name because defaults are provided in Validate() + // Only service containers have a custom name here containers, err := getContainers( cluster.Spec.Containers, cluster.Name, diff --git a/docs/getting_started/custom-resource-definition.md b/docs/getting_started/custom-resource-definition.md index 6466625f..117137ce 100644 --- a/docs/getting_started/custom-resource-definition.md +++ b/docs/getting_started/custom-resource-definition.md @@ -92,6 +92,16 @@ This would be equivalent to giving a start command of `sleep infinity` however o (e.g., if there is a flux shutdown from within the Flux instance) the sleep command would not exit with a failed code. +### oversubscribe + +By default, we treat your single application container _or_ the single container in a MiniCluster pod designated to "runFlux" as the only Flux broker. When oversubscribe is set to true, you are allowed to define more than one "runFlux" container, meaning that multiple brokers will be sharing the same resources. + +```yaml + oversubscribe: true +``` + +We created this use case with the intention of having a service container running fluxion alongside the MiniCluster to orchestrate the N containers. This is consiedered an advanced use case and you should use it with caution! + ### launcher If you are using an executor that launches Flux Jobs (e.g., workflow managers such as Snakemake and Nextflow do!) diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md index 7210b2fa..1605dad2 100644 --- a/docs/tutorials/index.md +++ b/docs/tutorials/index.md @@ -56,6 +56,7 @@ The following tutorials are provided from their respective directories (and are These examples show how to interact with your flux queue from a sidecar container (that has access to the flux broker of the pod): + - [multiple-applications-per-pod](https://github.com/flux-framework/flux-operator/tree/main/examples/experimental/multiple-applications-per-pod): Allow multiple applications to run in a single pod aided by a Fluxion service orchestrator. - [flux-sidecar](https://github.com/flux-framework/flux-operator/blob/main/examples/tests/flux-sidecar) to see a sleep job in the main application queue ### Services diff --git a/examples/dist/flux-operator-arm.yaml b/examples/dist/flux-operator-arm.yaml index 055bdced..85fd7d4d 100644 --- a/examples/dist/flux-operator-arm.yaml +++ b/examples/dist/flux-operator-arm.yaml @@ -532,6 +532,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/examples/dist/flux-operator.yaml b/examples/dist/flux-operator.yaml index 8a982953..9c58c360 100644 --- a/examples/dist/flux-operator.yaml +++ b/examples/dist/flux-operator.yaml @@ -532,6 +532,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/examples/experimental/multiple-applications-per-pod/README.md b/examples/experimental/multiple-applications-per-pod/README.md new file mode 100644 index 00000000..e5ef9074 --- /dev/null +++ b/examples/experimental/multiple-applications-per-pod/README.md @@ -0,0 +1,251 @@ +# Multiple Applications in a Pod + +This example is related to [multiple-pods-per-node](../multiple-pods-per-node) except we are testing a flipped variant - having several application containers that are submitting to the same +flux queue. For this example, we will use two simple applications from our examples - LAMMPS and the OSU benchmarks. We will try: + +1. Creating an interactive MiniCluster that has a shared Flux install +2. Two containers that each are running Flux. + +The key for the above is that while the two containers both have flux (meaning the view is mounted and available) only _one_ will start the flux broker and see the entire resources of the node. + +## Experiment + +### Create the Cluster + +We should be able to use kind for this. + +```bash +kind create cluster --config ../../kind-config.yaml +``` + +### Install the Flux Operator + +As follows: + +```bash +kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml +``` + +Note that I ran into issues with kind and the containers pulling - it + +### Fluxion Application Scheduler + +#### 1. Create the MiniCluster + +Then create the flux operator pods! + +```bash +kubectl apply -f minicluster.yaml +``` + +Wait for everything to be running: + +```console +NAME READY STATUS RESTARTS AGE +flux-sample-0-zgsgp 4/4 Running 0 15m +flux-sample-1-gqdf9 4/4 Running 0 15m +flux-sample-2-l7774 4/4 Running 0 15m +flux-sample-3-8whls 4/4 Running 0 15m +flux-sample-services 1/1 Running 0 15m +``` + +Here is the neat thing - each container running inside each pod is an independent broker that sees all resources! The lead broker (for each) is at index 0. You can confirm this by selecting to see logs for any specific container: + +```bash +# This is running the queue orchestrator +kubectl logs flux-sample-0-zgsgp -c queue + +# These are application containers +kubectl logs flux-sample-0-zgsgp -c lammps +kubectl logs flux-sample-0-zgsgp -c chatterbug +kubectl logs flux-sample-0-zgsgp -c ior +``` + +And this is the fluxion graph server, which is running as the scheduler for the entire cluster! + +```bash +$ kubectl logs flux-sample-services +๐Ÿฆฉ๏ธ This is the fluxion graph server +[GRPCServer] gRPC Listening on [::]:4242 +``` + +#### 2. Load the bypass plugin + +When the "queue" broker comes up, it loads a plugin on each of the application brokers that +ensures we can give scheduling decisions directly to those brokers from the fluxion service: + +```bash +for socket in $(ls /mnt/flux/view/run/flux/) + do + flux proxy local:///mnt/flux/view/run/flux/$socket flux jobtap load alloc-bypass.so +done +``` + +This will allow us to bypass the scheduler, and pass forward exactly the decision from fluxion. We do this so that +we can schedule down to the CPU and not have resource oversubscription. When all the containers are running and the queue starts, you should see: + +```bash +job-manager.err[0]: jobtap: job.new: callback returned error +โญ๏ธ Found application queue: index 0 +โญ๏ธ Found application chatterbug: index 3 +โญ๏ธ Found application ior: index 2 +โญ๏ธ Found application lammps: index 1 +โœ…๏ธ Init of Fluxion resource graph success! + * Serving Flask app 'fluxion_controller' + * Debug mode: off +WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. + * Running on all addresses (0.0.0.0) + * Running on http://127.0.0.1:5000 + * Running on http://10.244.0.50:5000 +Press CTRL+C to quit +``` + +We are ready to submit jobs! + +#### 3. Connect to the Queue + +The "queue" container of the set is special because it doesn't have a specific application - it's mostly a thin layer provided to interact with other containers (and we will run our application to handle orchestration from there). +So let's shell into this controller pod container - which is the one that doesn't have an application, but has access to all the resources available! Let's shell in: + +```bash +kubectl exec -it flux-sample-0-xxx bash +``` +```console +kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead. +Defaulted container "queue" out of: queue, lammps, ior, chatterbug, flux-view (init) +``` + +Notice in the message above we see all the containers running - and we are shelling into the first (queue). Also note that since we are installing some Python stuff, +you need to wait for that to finish before you see the flux socket for the queue show up. When it's done, it will be the index 0 here: + +```bash +ls /mnt/flux/view/run/flux/ +local-0 local-1 local-2 local-3 +``` +The indices correspond with the other containers. You can see the mapping here in the "meta" directory: + +```bash +ls /mnt/flux/view/etc/flux/meta/ +0-queue 1-lammps 2-ior 3-chatterbug +``` + +That's a pretty simple (maybe dumb) approach, but it will be how we get names for the containers when we run the Fluxion controller. Let's do that next! + +#### 4. Connect to Fluxion + +Since we can see all the instances running, this allows us to easily (meaning programatically) write a script that orchestrates interactions between the different brokers, where there is one broker per container set, where each container set is +running across all pods (physical nodes). Since we are doing this interactively, let's connect to the queue broker. It doesn't actually matter, and theoretically this could run from any container that is running Flux. +If this script that (TBA written) is run on startup, we won't need to do this. + +```bash +source /mnt/flux/flux-view.sh +flux proxy $fluxsocket bash +flux resource list +``` +```console +[root@flux-sample-0 /]# flux resource list + STATE NNODES NCORES NGPUS NODELIST + free 4 40 0 flux-sample-[0-3] + allocated 0 0 0 + down 0 0 0 +``` + +What we are seeing in the above is the set of resources that need to be shared across the containers (brokers). We don't want to oversubscribe, or for example, tell any specific broker that it can use all the resources while we tell the same to the others. We have to be careful that we use the Python install that is alongside the Flux install. Note that *you should not run this* but I want to show you how the queue was started. You can issue `--help` to see all the options to customize: + +```bash +# You can look at help... +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start --help + +# But DO NOT RUN THIS! It's already running. +# This is how the fluxion controller was started using the defaults (do not run this again) +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start +``` + +You might want to watch the main fluxion controller in a different terminal before submitting the job: + +```bash +kubectl logs flux-sample-0-wxxkp -f +``` + +Then from your interactive terminal, let's submit a job! To do that (and you can do this from any of the flux container brokers) - it will be hitting a web service that the Python script is exposing from the queue! + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --help + +# The command is the last bit here (ior) # command +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --cpu 4 --container ior ior +``` + +And then we see from where we submit: + +```console +โญ๏ธ Found application queue: index 0 +โญ๏ธ Found application chatterbug: index 3 +โญ๏ธ Found application ior: index 2 +โญ๏ธ Found application lammps: index 1 +{'annotations': {}, 'bank': '', 'container': 'ior', 'cwd': '', 'dependencies': [], 'duration': 3600.0, 'exception': {'note': '', 'occurred': False, 'severity': '', 'type': ''}, 'expiration': 0.0, 'fluxion': 1, 'id': 19001371525120, 'name': 'ior', 'ncores': 4, 'nnodes': 1, 'nodelist': 'flux-sample-3', 'ntasks': 1, 'priority': 16, 'project': '', 'queue': '', 'ranks': '3', 'result': 'COMPLETED', 'returncode': 0, 'runtime': 0.5983412265777588, 'state': 'INACTIVE', 'status': 'COMPLETED', 'success': True, 't_cleanup': 1719904964.2517486, 't_depend': 1719904963.6396549, 't_inactive': 1719904964.254762, 't_remaining': 0.0, 't_run': 1719904963.6534073, 't_submit': 1719904963.6277533, 'urgency': 16, 'userid': 0, 'username': 'root', 'waitstatus': 0} +``` + +And from the Fluxion service script: + +```console +INFO:werkzeug:Press CTRL+C to quit +INFO:fluxion_controller:{'command': ['ior'], 'cpu': '4', 'container': 'ior', 'duration': None, 'workdir': None} +INFO:fluxion_controller:{'t_depend': 1720132156.9946244, 't_run': 1720132157.0078795, 't_cleanup': 1720132157.5837069, 't_inactive': 1720132157.5849578, 'duration': 3600.0, 'expiration': 0.0, 'name': 'ior', 'cwd': '', 'queue': '', 'project': '', 'bank': '', 'ntasks': 1, 'ncores': 4, 'nnodes': 1, 'priority': 16, 'ranks': '3', 'nodelist': 'flux-sample-3', 'success': True, 'result': 'COMPLETED', 'waitstatus': 0, 'id': JobID(13180046671872), 't_submit': 1720132156.9826128, 't_remaining': 0.0, 'state': 'INACTIVE', 'username': 'root', 'userid': 0, 'urgency': 16, 'runtime': 0.5758273601531982, 'status': 'COMPLETED', 'returncode': 0, 'dependencies': [], 'annotations': {}, 'exception': {'occurred': False, 'severity': '', 'type': '', 'note': ''}, 'container': 'ior', 'fluxion': 1} +INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:29:18] "POST /submit HTTP/1.1" 200 - +INFO:fluxion_controller:๐Ÿ‘‰๏ธ Job on ior 1 is complete. +INFO:fluxion_controller:โœ…๏ธ Cancel of jobid 1 success! +``` + +Let's try submitting a job to the lammps application broker (container) now (note the container working directory has the input files) + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --cpu 4 --container lammps lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite +``` + +This one actually takes more than a second to run, so wait for that, and you'll eventually see the fluxion detect it is finished and clean up: + +```bash +INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:30:41] "POST /submit HTTP/1.1" 200 - +INFO:fluxion_controller:๐Ÿ‘‰๏ธ Job on lammps 2 is complete. +INFO:fluxion_controller:โœ…๏ธ Cancel of jobid 2 success! +``` + +Let's now list jobs for one container... + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs --container lammps +``` +```console + Jobs for Lammps +โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”“ +โ”ƒ Container โ”ƒ Id โ”ƒ Name โ”ƒ Status โ”ƒ Nodes โ”ƒ Cores โ”ƒ Runtime โ”ƒ Returncode โ”ƒ +โ”กโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ฉ +โ”‚ lammps โ”‚ ฦ’XBNj4c7 โ”‚ lmp โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 27 โ”‚ 0 โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +Or all containers! + + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs +``` +```console +โญ๏ธ Found application queue: index 0 +โญ๏ธ Found application chatterbug: index 3 +โญ๏ธ Found application ior: index 2 +โญ๏ธ Found application lammps: index 1 + Jobs for Queue, Chatterbug, Ior, Lammps +โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”“ +โ”ƒ Container โ”ƒ Id โ”ƒ Name โ”ƒ Status โ”ƒ Nodes โ”ƒ Cores โ”ƒ Runtime โ”ƒ Returncode โ”ƒ +โ”กโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ฉ +โ”‚ queue โ”‚ ฦ’4WEiYej โ”‚ python3.11 โ”‚ RUN โ”‚ 1 โ”‚ 1 โ”‚ 45 โ”‚ โ”‚ +โ”‚ ior โ”‚ ฦ’U1BWHuH โ”‚ ior โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 0 โ”‚ 0 โ”‚ +โ”‚ lammps โ”‚ ฦ’XBNj4c7 โ”‚ lmp โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 27 โ”‚ 0 โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +Lammps will show up like that when it is finished. I am calling this "pancake elasticity" since we can theoretically deploy many application containers and then use them when needed, essentially expanding the one running out (resource wise) while the others remain flat (not using resources). This isn't entirely ready yet (still testing) but a lot of the automation is in place. + +It's so super cool!! :D This is going to likely inspire the next round of work for thinking about scheduling and fluxion. diff --git a/examples/experimental/multiple-applications-per-pod/fluxion_controller.py b/examples/experimental/multiple-applications-per-pod/fluxion_controller.py new file mode 100644 index 00000000..aae3061b --- /dev/null +++ b/examples/experimental/multiple-applications-per-pod/fluxion_controller.py @@ -0,0 +1,618 @@ +import json +import sys +import os +import time +import argparse +import requests +import threading +import logging +from flask import Flask, jsonify, request + +from rich.console import Console +from rich.table import Table + + +try: + import flux + import flux.job +except: + sys.exit("Cannot import flux - are the Flux Python bindings on the path?") + +try: + # This is flux-sched fluxion + from fluxion.resourcegraph.V1 import FluxionResourceGraphV1 + + # This is from the fluxion service grpc + from fluxion.protos import fluxion_pb2 + from fluxion.client import FluxionClient + from flux.hostlist import Hostlist +except: + sys.exit("Cannot import fluxion - is the fluxion module enabled?") + +# Global fluxion client to receive server submit +ctrl = None +app = Flask(__name__) + +logging.basicConfig(level=logging.DEBUG) + + +@app.route("/submit", methods=["POST"]) +def submit_job(): + """ + Submit a job to the running Fluxion server. + """ + global ctrl + + data = request.get_json() + app.logger.info(data) + for required in ["command", "cpu", "container"]: + if required not in data or not data[required]: + return jsonify({"error": f"{required} is required."}) + + response = ctrl.submit_job( + data["container"], + data["cpu"], + data["command"], + workdir=data["workdir"], + duration=data["duration"], + ) + app.logger.info(response) + return jsonify(response) + + +# We assume a jobspec on a flux operator node is only customizable via nodes and cores +# We will hard code things for now (e.g., duration) and these could be customized +jobspec_template = """ +{ + "version": 1, + "resources": [ + { + "type": "slot", + "count": 1, + "label": "default", + "with": [ + { + "type": "core", + "count": %s + } + ] + } + ], + "attributes": { + "system": { + "duration": 3600 + } + }, + "tasks": [ + { + "command": [], + "slot": "default", + "count": { + "per_slot": 1 + } + } + ] +} +""" + + +class FluxionController: + """ + This is a controller that can: + + 1. Detect the individual application brokers running in the MiniCluster + 2. Create a single graph to schedule to in the higher level fluxion service + 3. Discover the broker sockets and create handles for each. + 4. Receive a request for work and schedule properly! + + The high level idea is that we are sharing resources between the different + application containers (each with a different broker that isn't aware of the + others) and we don't want to oversubscribe. The Fluxion controller will handle + this orchestration, scheduling work with the fluxion service and receiving + callbacks from the individual clusters to determine when work is done. + """ + + def __init__( + self, + resource_dir=None, + socket_dir=None, + fluxion_host=None, + meta_dir=None, + heartbeat_seconds=1, + ): + # These paths assume a flux operator install + self.socket_dir = socket_dir or "/mnt/flux/view/run/flux" + self.resource_dir = resource_dir or "/mnt/flux/view/etc/flux/system" + self.meta_dir = meta_dir or "/mnt/flux/view/etc/flux/meta" + + # This is the default for the headles service + self.fluxion_host = ( + fluxion_host + or "flux-sample-services.flux-service.default.svc.cluster.local:4242" + ) + + # This is imperfect - but we will keep a set of job ids for each container + self.jobs = {} + + # How often to check for completed jobs + self.heartbeat_seconds = heartbeat_seconds + self.handles = {} + self.resources = {} + self.containers = {} + + self.discover_containers() + self.discover_sockets() + self.discover_resources() + + def populate_jobs(self): + """ + Given running queues, populate with current jobs + + TBA: this will handle restoring from shutdown state. + Not supported yet. + """ + pass + + def discover_containers(self): + """ + Determine the container names and indices + """ + for container in os.listdir(self.meta_dir): + idx, container = container.split("-", 1) + print(f"โญ๏ธ Found application {container.rjust(10)}: index {idx}") + self.containers[int(idx)] = container + + def discover_resources(self): + """ + Discover physical node resources. + + Each container application is going to provide a unique R file, and this + is done in the case that we can/want to vary this in the future. However, + for the time being these are essentially the same so we can just read in + the first. + """ + for resource_file in os.listdir(self.resource_dir): + if not resource_file.startswith("R-"): + continue + _, idx = resource_file.split("-", 1) + + # Index based on the container + container = self.containers[int(idx)] + self.resources[container] = read_json( + os.path.join(self.resource_dir, resource_file) + ) + + # If we dont' have any resources, bail out early - something is wrong + if not self.resources: + sys.exit( + "No resource files found in {self.resource_dir} - this should not happen." + ) + + def discover_sockets(self): + """ + Discover sockets to create a flux handle to each + + We read in the associated container name via the meta directory in the + flux install, which is created by the flux operator. + """ + for socket_path in os.listdir(self.socket_dir): + # In practice there should not be anything else in here + if "local" not in socket_path: + continue + + # The socket has the index for the container in it + _, idx = socket_path.split("-", 1) + + # Use it to identify the container... + container = self.containers[int(idx)] + socket_fullpath = os.path.join(self.socket_dir, socket_path) + + # And generate the handle! + uri = f"local://{socket_fullpath}" + handle = flux.Flux(uri) + self.handles[container] = handle + + def init_fluxion(self): + """ + Connect to the fluxion service and create the graph. + """ + # Grab the first R to generate the resource graph from + # They are all the same + key = list(self.resources.keys())[0] + rv1 = self.resources[key] + graph = FluxionResourceGraphV1(rv1) + + # Dump of json graph format for fluxion + jgf = graph.to_JSON() + + # Init the fluxion graph - it only sees one of the entire cluster + self.cli = FluxionClient(host=self.fluxion_host) + + # Fluxion spits out an error that properties must be an object or null + for node in jgf["graph"]["nodes"]: + if "properties" in node["metadata"] and not node["metadata"]["properties"]: + node["metadata"]["properties"] = {} + + response = self.cli.init(json.dumps(jgf)) + if response.status == fluxion_pb2.InitResponse.ResultType.INIT_SUCCESS: + print("โœ…๏ธ Init of Fluxion resource graph success!") + else: + sys.exit(f"Issue with init, return code {response.status}") + + # Now run indefinitely, at least until we are done with the cluster + t1 = threading.Thread(target=self.run) + t1.start() + app.run(host="0.0.0.0") + + def run(self): + """ + Run fluxion, meaning we basically: + + 1. Check over known submit jobs for each handle. + 2. When they are done on a cluster, cancel in the overhead graph. + This is obviously imperfect in terms of state. What we can do to + prevent race conditions is to ensure that a job is running when + we submit it, that way we don't have two different brokers fighting + for the same resources. + """ + while True: + for container, handle in self.handles.items(): + jobs = [] + for jobset in self.jobs.get(container, []): + # Get the status of the job from the handle + info = flux.job.get_job(handle, jobset["container"]) + if info["result"] == "COMPLETED": + app.logger.info( + f"๐Ÿ‘‰๏ธ Job on {container} {jobset['fluxion']} is complete." + ) + self.cancel(jobset["fluxion"]) + continue + # Otherwise add back to jobs set + jobs.append(jobset) + self.jobs[container] = jobs + + # Do a sleep between the timeout + time.sleep(self.heartbeat_seconds) + + def cancel(self, jobid): + """ + Cancel a fluxion jobid + """ + # An inactive RPC cannot cancel + try: + response = self.cli.cancel(jobid=jobid) + if response.status == fluxion_pb2.CancelResponse.ResultType.CANCEL_SUCCESS: + app.logger.info(f"โœ…๏ธ Cancel of jobid {jobid} success!") + else: + app.logger.info(f"Issue with cancel, return code {response.status}") + except: + app.logger.info(f"โœ…๏ธ jobid {jobid} is already inactive.") + + def submit_error(self, message): + """ + Given a message, print (for the developer log) and return as json + """ + print(message) + return {"error": message} + + def list_jobs(self, containers): + """ + List jobs for one or more containers + """ + if not containers: + containers = list(self.handles.keys()) + if not containers: + sys.exit( + "One or more application target containers are required (--container)" + ) + + # Create a pretty table! + names = ", ".join(x.capitalize() for x in containers) + table = Table(title=f"Jobs for {names}") + + # These are the header columns + table.add_column("Container", justify="right", style="cyan", no_wrap=True) + table.add_column("Id", style="magenta") + table.add_column("Name", style="magenta") + table.add_column("Status", style="magenta") + table.add_column("Nodes", style="magenta") + table.add_column("Cores", style="magenta") + table.add_column("Runtime", style="magenta") + table.add_column("Returncode", justify="right", style="green") + + # They are asking for a broker container handle that doesn't exist + for container in containers: + table = self.list_container_jobs(container, table) + + console = Console() + console.print(table) + + def list_container_jobs(self, container, table): + """ + List jobs for a single container, adding to a single table + """ + # Allow failure and continue + if container not in self.handles: + print(f"Application container handle for {container} does not exist.") + return + + # Our broker hook to the container + handle = self.handles[container] + jobs = flux.job.job_list(handle).get()["jobs"] + + for info in jobs: + job = flux.job.get_job(handle, info["id"]) + status = f"{job['state']} ({job['status']})" + if job["status"] == job["state"]: + status = job["state"] + runtime = str(int(job["runtime"])) + jobid = str(job["id"]) + table.add_row( + container, + jobid, + job["name"], + status, + str(job["nnodes"]), + str(job["ncores"]), + runtime, + str(job["returncode"]), + ) + return table + + def submit_job( + self, + container, + cpu_count, + command, + workdir=None, + duration=None, + environment=None, + ): + """ + Demo of submitting a job. We will want a more robust way to do this. + + This currently just asks for the command and total cores across nodes. + We let fluxion decide how to distribute that across physical nodes. + """ + if not cpu_count: + sys.exit("A cpu count is required.") + if not container: + sys.exit("An application target container is required (--container)") + if not command: + sys.exit("Please provide a command to submit") + + # They are asking for a broker container handle that doesn't exist + if container not in self.handles: + choices = ",".join(list(self.handles.keys())) + return self.submit_error( + f"Application container handle for {container} does not exist - choices are {choices}." + ) + + # Our broker hook to the container + handle = self.handles[container] + + # Generate the jobspec, and see if we can match + jobspec = json.loads(jobspec_template % str(cpu_count)) + print(f"๐Ÿ™๏ธ Requesting to submit: {' '.join(command)}") + jobspec["tasks"][0]["command"] = command + + # Add additional system parameters + if duration is not None: + jobspec["attributes"]["system"]["duration"] = duration + if workdir is not None: + jobspec["attributes"]["system"]["cwd"] = workdir + if environment is not None and isinstance(environment, dict): + jobspec["attributes"]["system"]["environment"] = environment + + # This asks fluxion if we can schedule it + self.cli = FluxionClient(host=self.fluxion_host) + response = self.cli.match(json.dumps(jobspec)) + if response.status == fluxion_pb2.MatchResponse.ResultType.MATCH_SUCCESS: + print("โœ…๏ธ Match of jobspec to Fluxion graph success!") + else: + return self.submit_error( + f"Issue with match, return code {response.status}, cannot schedule now" + ) + + # We need the exact allocation to pass forward to the container broker + alloc = json.loads(response.allocation) + + # https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_31.html + # We are going to use ranks instead of hosts, since that is matched here + nodes = [ + x["metadata"]["name"] + for x in alloc["graph"]["nodes"] + if x["metadata"]["type"] == "node" + ] + ranks = [x.rsplit("-", 1)[-1] for x in nodes] + + # With the bypass plugin we can give a resource specification exactly to run + # https://flux-framework.readthedocs.io/en/latest/faqs.html#how-can-i-oversubscribe-tasks-to-resources-in-flux + # https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_20.html + # We cannot use constraint because we cannot limit cores + + # Create a constraint with AND for each host and the exact ranks assigned + # Note that this currently isn't supported so we just give the hostlist + # We need to be able to provide the exact hosts and cores on them. + resource_spec = { + "version": 1, + "execution": { + "R_lite": [], + "starttime": 0.0, + "expiration": 0.0, + "nodelist": ["flux-sample-[0-1]"], + }, + } + + # flux jobtap load system.alloc-bypass.R + # Example R_lite list: {'rank': '0', 'children': {'core': '0-4'}}, {'rank': '1', 'children': {'core': '6-8'}} + # nodelist: ['flux-sample-[0-1]'] + + r_lite = [] + for node in nodes: + ranks = [ + str(x["metadata"]["id"]) + for x in alloc["graph"]["nodes"] + if x["metadata"]["type"] == "node" + ] + cores = [ + str(x["metadata"]["id"]) + for x in alloc["graph"]["nodes"] + if x["metadata"]["type"] == "core" + and node in x["metadata"]["paths"]["containment"] + ] + r_lite.append( + {"rank": ",".join(ranks), "children": {"core": ",".join(cores)}} + ) + + hl = Hostlist(handle.attr_get("hostlist")) + hostlist = [hl[int(x)] for x in ranks] + resource_spec["execution"]["nodelist"] = hostlist + resource_spec["execution"]["R_lite"] = r_lite + + # Set the resource_spec on the plugin + jobspec["attributes"]["system"]["alloc-bypass"] = {"R": resource_spec} + + # Now we need to submit to the actual cluster, and store the mapping of our + # fluxion jobid to the cluster jobid. + fluxjob = flux.job.submit_async(handle, json.dumps(jobspec)) + + # Wait until it's running (and thus don't submit other jobs) + # This assumes running one client to submit, and prevents race + jobid = fluxjob.get_id() + print(f"โญ๏ธ Submit job {jobid} to container {container}") + + while True: + info = flux.job.get_job(handle, jobid) + print(f"Job is in state {info['state']}") + + # These should be all states that come before running or finished + if info["state"] in ["DEPEND", "PRIORITY", "SCHED"]: + time.sleep(self.heartbeat_seconds) + continue + break + + # Keep a record of the fluxion job id + if container not in self.jobs: + self.jobs[container] = [] + self.jobs[container].append({"fluxion": response.jobid, "container": jobid}) + + # Update the info and return back + info["container"] = container + info["fluxion"] = response.jobid + return info + + +def read_json(filename): + """ + Read content from a json file + """ + with open(filename, "r") as fd: + content = json.loads(fd.read()) + return content + + +def write_json(obj, filename): + """ + Write content to a json file + """ + with open(filename, "w") as fd: + fd.write(json.dumps(obj, indent=4)) + + +def get_parser(): + parser = argparse.ArgumentParser( + description="Fluxion Application Scheduler Controller", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = parser.add_subparsers( + help="actions", + title="actions", + description="Fluxion application scheduler controller subparsers", + dest="command", + ) + start = subparsers.add_parser( + "start", description="initialize and start fluxion (only do this once)!" + ) + jobs = subparsers.add_parser( + "jobs", + description="list jobs for a specific application broker", + formatter_class=argparse.RawTextHelpFormatter, + ) + submit = subparsers.add_parser( + "submit", + description="submit a JobSpec for a specific application broker", + formatter_class=argparse.RawTextHelpFormatter, + ) + + # Submit enforces just one container + for command in [submit, jobs]: + command.add_argument( + "-c", "--container", help="Application container to target", action="append" + ) + submit.add_argument("--cpu", help="Total CPU across N nodes to request under slot") + submit.add_argument("--workdir", help="Working directory for application") + submit.add_argument( + "--timeout", + help="Total runtime seconds (timeout) for application, defaults to 3600", + type=int, + ) + submit.add_argument( + "--host", + help="MiniCluster hostname running the service", + default="flux-sample-0.flux-service.default.svc.cluster.local:5000", + ) + + for command in [start, submit, jobs]: + command.add_argument("--fluxion-host", help="Fluxion service host") + command.add_argument( + "--resource-dir", help="MiniCluster resource (R) directory" + ) + command.add_argument("--socket-dir", help="MiniCluster socket directory") + command.add_argument("--meta-dir", help="MiniCluster Flux meta directory") + return parser + + +def main(): + """ + Create a fluxion graph handler for the application broker cluster. + """ + global ctrl + + parser = get_parser() + args, command = parser.parse_known_args() + ctrl = FluxionController( + socket_dir=args.socket_dir, + resource_dir=args.resource_dir, + meta_dir=args.meta_dir, + ) + + if not args.command: + parser.print_help() + + # Init creates the resource graph and must be called once first + if args.command == "start": + ctrl.init_fluxion() + + elif args.command == "jobs": + ctrl.list_jobs(args.container) + + # The submit issues a post to the running server + elif args.command == "submit": + if not args.container or len(args.container) > 1: + sys.exit("Submit requires exactly one container.") + response = requests.post( + f"http://{args.host}/submit", + json={ + "command": command, + "cpu": args.cpu, + "container": args.container[0], + "duration": args.timeout, + "workdir": args.workdir, + }, + ) + print(response.json()) + + +if __name__ == "__main__": + main() diff --git a/examples/experimental/multiple-applications-per-pod/minicluster.yaml b/examples/experimental/multiple-applications-per-pod/minicluster.yaml new file mode 100644 index 00000000..0d12eecc --- /dev/null +++ b/examples/experimental/multiple-applications-per-pod/minicluster.yaml @@ -0,0 +1,60 @@ +apiVersion: flux-framework.org/v1alpha2 +kind: MiniCluster +metadata: + name: flux-sample +spec: + size: 4 + flux: + container: + image: ghcr.io/converged-computing/flux-view-ubuntu:tag-focal + + # This ensures that fluxion is running as a service to the MiniCluster + services: + - image: ghcr.io/converged-computing/fluxion:latest + command: /code/bin/server --host 0.0.0.0 + name: fluxion + + # A required marker from the user that they want multiple runFlux + # to work. This is considered an advanced use case. + oversubscribe: true + + containers: + + # This is a faux "queue only" broker container. It will be + # the interface to which we submit jobs. We install the + # fluxion library to interact with the corresponding service. + - image: rockylinux:9 + name: queue + commands: + pre: | + yum install -y git wget + /mnt/flux/view/bin/python3.11 -m ensurepip + /mnt/flux/view/bin/python3.11 -m pip install Flask requests rich + /mnt/flux/view/bin/python3.11 -m pip install -e "git+https://github.com/converged-computing/fluxion.git#egg=fluxion&subdirectory=python/v1" + wget -O /mnt/flux/view/fluxion_controller.py https://raw.githubusercontent.com/flux-framework/flux-operator/multiple-applications-per-pod/examples/experimental/multiple-applications-per-pod/fluxion_controller.py + # By the time we get here, the other brokers have started. + for socket in $(ls /mnt/flux/view/run/flux/) + do + echo "Enabling alloc bypass for $socket" + flux proxy local:///mnt/flux/view/run/flux/$socket flux jobtap load /mnt/flux/view/lib/flux/job-manager/plugins/alloc-bypass.so + done + + runFlux: true + command: /mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start + + # command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite + - image: ghcr.io/rse-ops/lammps-matrix:mpich-ubuntu-20.04-amd64 + name: lammps + workingDir: /opt/lammps/examples/reaxff/HNS + command: run_interactive_cluster + runFlux: true + + - image: ghcr.io/converged-computing/metric-ior:latest + name: ior + command: run_interactive_cluster + runFlux: true + + - image: ghcr.io/converged-computing/metric-chatterbug:latest + name: chatterbug + command: run_interactive_cluster + runFlux: true diff --git a/examples/experimental/multiple-pods-per-node/README.md b/examples/experimental/multiple-pods-per-node/README.md index 51f85939..63c8a8fd 100644 --- a/examples/experimental/multiple-pods-per-node/README.md +++ b/examples/experimental/multiple-pods-per-node/README.md @@ -27,18 +27,12 @@ gcloud container clusters create test-cluster \ ### Install the Flux Operator -We are going to install the Flux operator from the refactor branch (with the feature added to disable affinity). +As follows: ```bash -git clone -b test-refactor-modular -cd test-refactor-modular - -# You might need other dependencies, etc. here or to specify your own registry you can push to. -make test-deploy-recreate +kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml ``` -If/when this is released, you can install from a release. - ### Experiments Then create the flux operator pods. diff --git a/examples/tests/osu-benchmarks/minicluster.yaml b/examples/tests/osu-benchmarks/minicluster.yaml index bf6a3d9f..f03a6dff 100644 --- a/examples/tests/osu-benchmarks/minicluster.yaml +++ b/examples/tests/osu-benchmarks/minicluster.yaml @@ -21,4 +21,4 @@ spec: containers: - image: ghcr.io/converged-computing/metric-osu-benchmark:latest workingDir: /opt/osu-benchmark/build.openmpi/libexec/osu-micro-benchmarks/mpi/one-sided - command: ./osu_get_latency \ No newline at end of file + command: ./osu_get_latency diff --git a/pkg/flux/config.go b/pkg/flux/config.go index 10f607a0..df2918d5 100644 --- a/pkg/flux/config.go +++ b/pkg/flux/config.go @@ -34,7 +34,13 @@ func getRandomToken(requested string) string { } // generateHostlist for a specific size given the cluster namespace and a size -func generateHostlist(cluster *api.MiniCluster, size int32) string { +// Note that we don't customize on the level of the container, but I'm +// generating them separately anticipating wanting slightly different setups. +func generateHostlist( + cluster *api.MiniCluster, + container api.MiniClusterContainer, + size int32, +) string { var hosts string if cluster.Spec.Flux.Bursting.Hostlist != "" { diff --git a/pkg/flux/entrypoint.go b/pkg/flux/entrypoint.go index c9b50a90..d30e9e9c 100644 --- a/pkg/flux/entrypoint.go +++ b/pkg/flux/entrypoint.go @@ -37,7 +37,7 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) { // Custom logic for a sidecar container alongside flux if container.GenerateEntrypoint() { startScriptID := fmt.Sprintf("start-%d", i) - startScript, err := generateServiceEntrypoint(cluster, container) + startScript, err := generateServiceEntrypoint(cluster, container, i) if err != nil { return data, err } @@ -58,11 +58,16 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) { } // generateServiceEntrypoint generates an entrypoint for a service container -func generateServiceEntrypoint(cluster *api.MiniCluster, container api.MiniClusterContainer) (string, error) { +func generateServiceEntrypoint( + cluster *api.MiniCluster, + container api.MiniClusterContainer, + containerIndex int) (string, error) { + st := ServiceTemplate{ - ViewBase: cluster.Spec.Flux.Container.MountPath, - Container: container, - Spec: cluster.Spec, + ViewBase: cluster.Spec.Flux.Container.MountPath, + Container: container, + ContainerIndex: containerIndex, + Spec: cluster.Spec, } // Wrap the named template to identify it later @@ -88,7 +93,7 @@ func generateEntrypointScript( ) (string, error) { container := cluster.Spec.Containers[containerIndex] - mainHost := fmt.Sprintf("%s-0", cluster.Name) + mainHost := fmt.Sprintf("%s-0", container.Name) // Ensure if we have a batch command, it gets split up batchCommand := strings.Split(container.Command, "\n") @@ -99,12 +104,13 @@ func generateEntrypointScript( // The token uuid is the same across images wt := WaitTemplate{ - RequiredRanks: requiredRanks, - ViewBase: cluster.Spec.Flux.Container.MountPath, - Container: container, - MainHost: mainHost, - Spec: cluster.Spec, - Batch: batchCommand, + RequiredRanks: requiredRanks, + ViewBase: cluster.Spec.Flux.Container.MountPath, + ContainerIndex: containerIndex, + Container: container, + MainHost: mainHost, + Spec: cluster.Spec, + Batch: batchCommand, } // Wrap the named template to identify it later diff --git a/pkg/flux/scripts.go b/pkg/flux/scripts.go index 45b798f1..81f30b23 100644 --- a/pkg/flux/scripts.go +++ b/pkg/flux/scripts.go @@ -29,9 +29,10 @@ var startComponents string // ServiceTemplate is for a separate service container type ServiceTemplate struct { - ViewBase string // Where the mounted view with flux is expected to be - Container api.MiniClusterContainer - Spec api.MiniClusterSpec + ViewBase string // Where the mounted view with flux is expected to be + Container api.MiniClusterContainer + ContainerIndex int + Spec api.MiniClusterSpec } // WaitTemplate populates wait.sh for an application container entrypoint @@ -40,7 +41,10 @@ type WaitTemplate struct { MainHost string // Main host identifier FluxToken string // Token to log into the UI, should be consistent across containers Container api.MiniClusterContainer - Spec api.MiniClusterSpec + + // Index for container, for generation of unique socket path + ContainerIndex int + Spec api.MiniClusterSpec // Broker initial quorum that must be online to start // This is used if the cluster MaxSize > Size diff --git a/pkg/flux/templates/components.sh b/pkg/flux/templates/components.sh index c246d285..984e4b6d 100644 --- a/pkg/flux/templates/components.sh +++ b/pkg/flux/templates/components.sh @@ -63,6 +63,43 @@ command="/bin/bash ./custom-entrypoint.sh" {{end}} {{end}} +{{define "broker"}} +brokerOptions="-Scron.directory=/etc/flux/system/cron-{{ .ContainerIndex }}.d \ + -Stbon.fanout=256 \ + -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ + -Sstatedir=${STATE_DIR} \ + -Slocal-uri=local://$viewroot/run/flux/local-{{ .ContainerIndex }} \ +{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ +{{ if .RequiredRanks }}-Sbroker.quorum={{ .RequiredRanks }}{{ end }} \ +{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ +{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ + -Slog-stderr-mode=local" + + +# Run an interactive cluster, giving no command to flux start +function run_interactive_cluster() { + echo "๐ŸŒ€ flux broker --config-path ${cfg} ${brokerOptions}" + flux broker --config-path ${cfg} ${brokerOptions} +} +{{end}} + +{{define "worker-broker"}} +cfg="${viewroot}/etc/flux/config-{{ .ContainerIndex }}" +brokerOptions="-Stbon.fanout=256 \ + -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ + -Slocal-uri=local://$viewroot/run/flux/local-{{ .ContainerIndex }} \ +{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ +{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ +{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ + -Slog-stderr-mode=local" + +# This is provided as an optional function for a worker +function run_interactive_cluster() { + echo "๐ŸŒ€ flux broker --config-path ${cfg} ${brokerOptions}" + flux broker --config-path ${cfg} ${brokerOptions} +} +{{end}} + {{define "paths"}} foundroot=$(find $viewroot -maxdepth 2 -type d -path $viewroot/lib/python3\*) {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} pythonversion=$(basename ${foundroot}) @@ -92,8 +129,12 @@ cat <> ${viewbase}/flux-view.sh export PATH=$PATH export PYTHONPATH=$PYTHONPATH export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$viewroot/lib -export fluxsocket=local://${viewroot}/run/flux/local +export fluxsocket=local://${viewroot}/run/flux/local-{{ .ContainerIndex }} EOT + +# Also write a file that indicates the container name at the index +mkdir -p $viewroot/etc/flux/meta +touch "${viewroot}/etc/flux/meta/{{ .ContainerIndex }}-{{ .Container.Name }}" {{end}} {{define "ensure-pip"}} ${pythonversion} -m pip --version || ${pythonversion} -m ensurepip || (wget https://bootstrap.pypa.io/get-pip.py && ${pythonversion} ./get-pip.py) {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} diff --git a/pkg/flux/templates/start.sh b/pkg/flux/templates/start.sh index 3c984d4e..755dafce 100644 --- a/pkg/flux/templates/start.sh +++ b/pkg/flux/templates/start.sh @@ -18,7 +18,7 @@ {{ .Container.Commands.ServicePre}} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} # Ensure socket path is envar for user -fluxsocket=${viewroot}/run/flux/local +fluxsocket=${viewroot}/run/flux/local-{{ .ContainerIndex }} # Wait for it to exist (application is running) {{ if .Spec.Flux.NoWaitSocket }}{{ else }}goshare-wait-fs -p ${fluxsocket} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }}{{ end }} @@ -29,6 +29,8 @@ fluxsocket="local://$fluxsocket" # Is a custom script provided? {{template "custom-script" .}} +{{template "worker-broker" .}} + {{ .Container.Command }} {{ .Container.Commands.Post}} diff --git a/pkg/flux/templates/wait.sh b/pkg/flux/templates/wait.sh index dc489d45..b2edf4b0 100644 --- a/pkg/flux/templates/wait.sh +++ b/pkg/flux/templates/wait.sh @@ -19,7 +19,7 @@ fluxuser=$(whoami) fluxuid=$(id -u $fluxuser) # Variables we can use again -cfg="${viewroot}/etc/flux/config" +cfg="${viewroot}/etc/flux/config-{{ .ContainerIndex }}" command="{{ .Container.Command }}" # Is a custom script provided? This will override command @@ -55,23 +55,23 @@ chown -R ${fluxuid} ${curvepath} # If we have disabled the view, we need to use the flux here to generate resources {{ if .Spec.Flux.Container.Disable }} -hosts=$(cat ${viewroot}/etc/flux/system/hostlist) +hosts=$(cat ${viewroot}/etc/flux/system/hostlist-{{ .ContainerIndex }}) {{ if not .Spec.Logging.Quiet }} echo echo "๐Ÿ“ฆ Resources" echo "flux R encode --hosts=${hosts} --local" {{ end }} -flux R encode --hosts=${hosts} --local > ${viewroot}/etc/flux/system/R -{{ if not .Spec.Logging.Quiet }}cat ${viewroot}/etc/flux/system/R{{ end }} +flux R encode --hosts=${hosts} --local > ${viewroot}/etc/flux/system/R-{{ .ContainerIndex }} +{{ if not .Spec.Logging.Quiet }}cat ${viewroot}/etc/flux/system/R-{{ .ContainerIndex }}{{ end }} {{ end }} # Put the state directory in /var/lib on shared view -export STATE_DIR=${viewroot}/var/lib/flux +export STATE_DIR=${viewroot}/var/lib/flux-{{ .ContainerIndex }} export FLUX_OUTPUT_DIR={{ if .Container.Logs }}{{.Container.Logs}}{{ else }}/tmp/fluxout{{ end }} mkdir -p ${STATE_DIR} ${FLUX_OUTPUT_DIR} # Main host -0 and the fully qualified domain name -mainHost="{{ .MainHost }}" +mainHost="{{ .Container.Name }}-0" workdir=$(pwd) {{ if .Spec.Logging.Quiet }}{{ else }} @@ -82,23 +82,7 @@ echo "The working directory is ${workdir}, contents include:" ls . {{ end }} -brokerOptions="-Scron.directory=/etc/flux/system/cron.d \ - -Stbon.fanout=256 \ - -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ - -Sstatedir=${STATE_DIR} \ - -Slocal-uri=local://$viewroot/run/flux/local \ -{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ -{{ if .RequiredRanks }}-Sbroker.quorum={{ .RequiredRanks }}{{ end }} \ -{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ -{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ - -Slog-stderr-mode=local" - - -# Run an interactive cluster, giving no command to flux start -function run_interactive_cluster() { - echo "๐ŸŒ€ flux broker --config-path ${cfg} ${brokerOptions}" - flux broker --config-path ${cfg} ${brokerOptions} -} +{{template "broker" .}} # if we are given an archive to use, load first, not required to exist # Note that we ask the user to dump in interactive mode - I am not @@ -116,7 +100,7 @@ fi{{ end }} {{ if not .Spec.Logging.Quiet }}echo "๐Ÿšฉ๏ธ Flux Option Flags defined"{{ end }} # Start flux with the original entrypoint -if [ $(hostname) == "${mainHost}" ]; then +if [ "{{ .Container.Name }}-{{ .ContainerIndex }}" == "${mainHost}" ]; then # If it's a batch job, we write the script for the broker to run {{ if .Container.Batch }}rm -rf flux-job.batch @@ -180,13 +164,13 @@ else {{ .Container.Commands.WorkerPre}} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} # We basically sleep/wait until the lead broker is ready - echo "๐ŸŒ€ flux start {{ if .Spec.Flux.Wrap }}--wrap={{ .Spec.Flux.Wrap }} {{ end }} -o --config ${viewroot}/etc/flux/config ${brokerOptions}" + echo "๐ŸŒ€ flux start {{ if .Spec.Flux.Wrap }}--wrap={{ .Spec.Flux.Wrap }} {{ end }} -o --config ${cfg} ${brokerOptions}" # We can keep trying forever, don't care if worker is successful or not # Unless retry count is set, in which case we stop after retries while true do - flux start -o --config ${viewroot}/etc/flux/config ${brokerOptions} + flux start -o --config ${cfg} ${brokerOptions} retval=$? if [[ "${retval}" -eq 0 ]] || [[ "{{ .Spec.Flux.CompleteWorkers }}" == "true" ]]; then echo "The follower worker exited cleanly. Goodbye!" diff --git a/pkg/flux/view.go b/pkg/flux/view.go index 6061facc..9c9db030 100644 --- a/pkg/flux/view.go +++ b/pkg/flux/view.go @@ -29,8 +29,8 @@ func generateHostBlock(hosts string, cluster *api.MiniCluster) string { // Unless we have a bursting broker address if cluster.Spec.Flux.Bursting.LeadBroker.Address != "" { - hostTemplate = `hosts = [{host="%s", bind="tcp://eth0:%s", connect="tcp://%s:%s"}, - {host="%s"}]` + hostTemplate = `hosts = [{host="%s", bind="tcp://eth0:%s", connect="tcp://%s:%d"}, + {host="%d"}]` hostBlock = fmt.Sprintf( hostTemplate, @@ -43,12 +43,18 @@ func generateHostBlock(hosts string, cluster *api.MiniCluster) string { return hostBlock } -func generateBrokerConfig(cluster *api.MiniCluster, hosts string) string { +func generateBrokerConfig( + cluster *api.MiniCluster, + hosts string, + containerIndex int, +) string { if cluster.Spec.Flux.BrokerConfig != "" { return cluster.Spec.Flux.BrokerConfig } + // Port assembled based on index. Right now this only supports up + defaultPort := 8050 + containerIndex hostBlock := generateHostBlock(hosts, cluster) fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", cluster.Spec.Network.HeadlessName, cluster.Namespace) @@ -62,17 +68,17 @@ allow-root-owner = true # Point to resource definition generated with flux-R(1). [resource] -path = "%s/view/etc/flux/system/R" +path = "%s/view/etc/flux/system/R-%d" [bootstrap] curve_cert = "%s/view/curve/curve.cert" -default_port = 8050 +default_port = %d default_bind = "%s" default_connect = "%s" %s [archive] -dbpath = "%s/view/var/lib/flux/job-archive.sqlite" +dbpath = "%s/view/var/lib/flux/job-archive-%d.sqlite" period = "1m" busytimeout = "50s" @@ -82,11 +88,14 @@ queue-policy = "%s" return fmt.Sprintf( template, cluster.Spec.Flux.Container.MountPath, + containerIndex, cluster.Spec.Flux.Container.MountPath, + defaultPort, defaultBind, defaultConnect, hostBlock, cluster.Spec.Flux.Container.MountPath, + containerIndex, cluster.Spec.Flux.Scheduler.QueuePolicy, ) @@ -96,37 +105,28 @@ queue-policy = "%s" // This is run inside of the flux container that will be copied to the empty volume // If the flux container is disabled, we still add an init container with // the broker config, etc., but we don't expect a flux view there. -func GenerateFluxEntrypoint(cluster *api.MiniCluster) (string, error) { +func GenerateFluxEntrypoint( + cluster *api.MiniCluster, +) (string, error) { // fluxRoot for the view is in /opt/view/lib // This must be consistent between the flux-view containers // github.com:converged-computing/flux-views.git fluxRoot := "/opt/view" - mainHost := fmt.Sprintf("%s-0", cluster.Name) - - // Generate hostlists, this is the lead broker - hosts := generateHostlist(cluster, cluster.Spec.MaxSize) - brokerConfig := generateBrokerConfig(cluster, hosts) - // If we are disabling the view, it won't have flux (or extra spack copies) // We copy our faux flux config directory (not a symlink) to the mount path spackView := fmt.Sprintf(`mkdir -p $viewroot/software -cp -R /opt/view/* %s/view`, + cp -R /opt/view/* %s/view`, cluster.Spec.Flux.Container.MountPath, ) generateHosts := `echo '๐Ÿ“ฆ Flux view disabled, not generating resources here.' -mkdir -p ${fluxroot}/etc/flux/system -` - if !cluster.Spec.Flux.Container.Disable { - generateHosts = ` -echo "flux R encode --hosts=${hosts} --local" -flux R encode --hosts=${hosts} --local > ${fluxroot}/etc/flux/system/R + mkdir -p ${fluxroot}/etc/flux/system + ` -echo -echo "๐Ÿ“ฆ Resources" -cat ${fluxroot}/etc/flux/system/R` + // Create a different broker.toml for each runFlux container + if !cluster.Spec.Flux.Container.Disable { spackView = `# Now prepare to copy finished spack view over echo "Moving content from /opt/view to be in shared volume at %s" @@ -143,9 +143,57 @@ cp -R /opt/software $viewroot/ ` } + // Generate a broker config for each potential running flux container + brokerConfigs := "" + for i, container := range cluster.Spec.Containers { + if !container.RunFlux { + continue + } + + // Generate hostlists, this is the lead broker + hosts := generateHostlist(cluster, container, cluster.Spec.MaxSize) + + // Create a different broker.toml for each runFlux container + if !cluster.Spec.Flux.Container.Disable { + generateHosts = fmt.Sprintf(` +echo "flux R encode --hosts=${hosts} --local" +flux R encode --hosts=${hosts} --local > ${fluxroot}/etc/flux/system/R-%d + +echo +echo "๐Ÿ“ฆ Resources" +cat ${fluxroot}/etc/flux/system/R-%d`, i, i) + } + + brokerConfig := generateBrokerConfig(cluster, hosts, i) + brokerConfigs += fmt.Sprintf(` +# Write the broker configuration +mkdir -p ${fluxroot}/etc/flux/config-%d + +cat <> ${fluxroot}/etc/flux/config-%d/broker.toml +%s +EOT + +# These actions need to happen on all hosts +mkdir -p $fluxroot/etc/flux/system +hosts="%s" + +# Echo hosts here in case the main container needs to generate +echo "${hosts}" > ${fluxroot}/etc/flux/system/hostlist-%d +%s + +# Cron directory +mkdir -p $fluxroot/etc/flux/system/cron-%d.d +mkdir -p $fluxroot/var/lib/flux + +# The rundir needs to be created first, and owned by user flux +# Along with the state directory and curve certificate +mkdir -p ${fluxroot}/run/flux ${fluxroot}/etc/curve + +`, i, i, brokerConfig, hosts, i, generateHosts, i) + } + setup := `#!/bin/sh fluxroot=%s -mainHost=%s echo "Hello I am hostname $(hostname) running setup." # Always use verbose, no reason to not here @@ -158,31 +206,14 @@ export PATH=/opt/view/bin:$PATH # If the view doesn't exist, ensure basic paths do mkdir -p $fluxroot/bin -# Cron directory -mkdir -p $fluxroot/etc/flux/system/cron.d -mkdir -p $fluxroot/var/lib/flux - -# These actions need to happen on all hosts -mkdir -p $fluxroot/etc/flux/system -hosts="%s" - -# Echo hosts here in case the main container needs to generate -echo "${hosts}" > ${fluxroot}/etc/flux/system/hostlist -%s - -# Write the broker configuration -mkdir -p ${fluxroot}/etc/flux/config -cat <> ${fluxroot}/etc/flux/config/broker.toml %s -EOT echo echo "๐Ÿธ Broker Configuration" -cat ${fluxroot}/etc/flux/config/broker.toml - -# The rundir needs to be created first, and owned by user flux -# Along with the state directory and curve certificate -mkdir -p ${fluxroot}/run/flux ${fluxroot}/etc/curve +for filename in $(find ${fluxroot}/etc/flux -name broker.toml) + do + cat $filename +done # View the curve certificate echo "๐ŸŒŸ๏ธ Curve Certificate" @@ -201,10 +232,7 @@ echo "Application is done." return fmt.Sprintf( setup, fluxRoot, - mainHost, - hosts, - generateHosts, - brokerConfig, + brokerConfigs, cluster.Spec.Flux.Container.MountPath, spackView, ), nil diff --git a/sdk/python/v1alpha2/docs/MiniClusterSpec.md b/sdk/python/v1alpha2/docs/MiniClusterSpec.md index 74b4c855..c8693e38 100644 --- a/sdk/python/v1alpha2/docs/MiniClusterSpec.md +++ b/sdk/python/v1alpha2/docs/MiniClusterSpec.md @@ -16,6 +16,7 @@ Name | Type | Description | Notes **max_size** | **int** | MaxSize (maximum number of pods to allow scaling to) | [optional] **min_size** | **int** | MinSize (minimum number of pods that must be up for Flux) Note that this option does not edit the number of tasks, so a job could run with fewer (and then not start) | [optional] **network** | [**Network**](Network.md) | | [optional] +**oversubscribe** | **bool** | Allow >1 Flux running (oversubscribing resources) | [optional] [default to False] **pod** | [**PodSpec**](PodSpec.md) | | [optional] **services** | [**list[MiniClusterContainer]**](MiniClusterContainer.md) | Services are one or more service containers to bring up alongside the MiniCluster. | [optional] **share_process_namespace** | **bool** | Share process namespace? | [optional] [default to False] diff --git a/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py b/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py index ae98903e..0b1aeeba 100644 --- a/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py +++ b/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py @@ -47,6 +47,7 @@ class MiniClusterSpec(object): 'max_size': 'int', 'min_size': 'int', 'network': 'Network', + 'oversubscribe': 'bool', 'pod': 'PodSpec', 'services': 'list[MiniClusterContainer]', 'share_process_namespace': 'bool', @@ -66,6 +67,7 @@ class MiniClusterSpec(object): 'max_size': 'maxSize', 'min_size': 'minSize', 'network': 'network', + 'oversubscribe': 'oversubscribe', 'pod': 'pod', 'services': 'services', 'share_process_namespace': 'shareProcessNamespace', @@ -73,7 +75,7 @@ class MiniClusterSpec(object): 'tasks': 'tasks' } - def __init__(self, archive=None, cleanup=False, containers=None, deadline_seconds=31500000, flux=None, interactive=False, job_labels=None, logging=None, max_size=None, min_size=None, network=None, pod=None, services=None, share_process_namespace=False, size=1, tasks=1, local_vars_configuration=None): # noqa: E501 + def __init__(self, archive=None, cleanup=False, containers=None, deadline_seconds=31500000, flux=None, interactive=False, job_labels=None, logging=None, max_size=None, min_size=None, network=None, oversubscribe=False, pod=None, services=None, share_process_namespace=False, size=1, tasks=1, local_vars_configuration=None): # noqa: E501 """MiniClusterSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -90,6 +92,7 @@ def __init__(self, archive=None, cleanup=False, containers=None, deadline_second self._max_size = None self._min_size = None self._network = None + self._oversubscribe = None self._pod = None self._services = None self._share_process_namespace = None @@ -118,6 +121,8 @@ def __init__(self, archive=None, cleanup=False, containers=None, deadline_second self.min_size = min_size if network is not None: self.network = network + if oversubscribe is not None: + self.oversubscribe = oversubscribe if pod is not None: self.pod = pod if services is not None: @@ -376,6 +381,29 @@ def network(self, network): self._network = network + @property + def oversubscribe(self): + """Gets the oversubscribe of this MiniClusterSpec. # noqa: E501 + + Allow >1 Flux running (oversubscribing resources) # noqa: E501 + + :return: The oversubscribe of this MiniClusterSpec. # noqa: E501 + :rtype: bool + """ + return self._oversubscribe + + @oversubscribe.setter + def oversubscribe(self, oversubscribe): + """Sets the oversubscribe of this MiniClusterSpec. + + Allow >1 Flux running (oversubscribing resources) # noqa: E501 + + :param oversubscribe: The oversubscribe of this MiniClusterSpec. # noqa: E501 + :type oversubscribe: bool + """ + + self._oversubscribe = oversubscribe + @property def pod(self): """Gets the pod of this MiniClusterSpec. # noqa: E501