diff --git a/microcloud/api/services.go b/microcloud/api/services.go index 67b7d957d..4a1aaa5d4 100644 --- a/microcloud/api/services.go +++ b/microcloud/api/services.go @@ -56,7 +56,7 @@ var ServicesCmd = func(sh *service.Handler) rest.Endpoint { } // servicesPut updates the cluster status of the MicroCloud peer. -func servicesPut(s *state.State, r *http.Request) response.Response { +func servicesPut(state *state.State, r *http.Request) response.Response { // Parse the request. req := types.ServicesPut{} @@ -78,13 +78,13 @@ func servicesPut(s *state.State, r *http.Request) response.Response { addr = req.Address } - sh, err := service.NewHandler(s.Name(), addr, s.OS.StateDir, false, false, services...) + sh, err := service.NewHandler(state.Name(), addr, state.OS.StateDir, false, false, services...) if err != nil { return response.SmartError(err) } err = sh.RunConcurrent(true, true, func(s service.Service) error { - err = s.Join(joinConfigs[s.Type()]) + err = s.Join(state.Context, joinConfigs[s.Type()]) if err != nil { return fmt.Errorf("Failed to join %q cluster: %w", s.Type(), err) } diff --git a/microcloud/cmd/microcloud/add.go b/microcloud/cmd/microcloud/add.go index 17ef96e9e..deb8da008 100644 --- a/microcloud/cmd/microcloud/add.go +++ b/microcloud/cmd/microcloud/add.go @@ -51,7 +51,7 @@ func (c *cmdAdd) Run(cmd *cobra.Command, args []string) error { return fmt.Errorf("MicroCloud is uninitialized, run 'microcloud init' first") } - addr, subnet, err := askAddress(c.flagAutoSetup, status.Address.Addr().String()) + addr, subnet, err := c.common.askAddress(c.flagAutoSetup, status.Address.Addr().String()) if err != nil { return err } @@ -62,7 +62,7 @@ func (c *cmdAdd) Run(cmd *cobra.Command, args []string) error { types.MicroOVN: api.MicroOVNDir, } - services, err = askMissingServices(services, optionalServices, c.flagAutoSetup) + services, err = c.common.askMissingServices(services, optionalServices, c.flagAutoSetup) if err != nil { return err } @@ -72,25 +72,21 @@ func (c *cmdAdd) Run(cmd *cobra.Command, args []string) error { return err } - peers, err := lookupPeers(s, c.flagAutoSetup, subnet) + systems := map[string]InitSystem{} + err = lookupPeers(s, c.flagAutoSetup, subnet, systems) if err != nil { return err } - lxdConfig, cephDisks, err := askDisks(s, peers, false, c.flagAutoSetup, c.flagWipe) + err = c.common.askDisks(s, systems, c.flagAutoSetup, c.flagWipe) if err != nil { return err } - uplinkNetworks, networkConfig, err := askNetwork(s, peers, lxdConfig, false, c.flagAutoSetup) + err = c.common.askNetwork(s, systems, c.flagAutoSetup) if err != nil { return err } - err = AddPeers(s, peers, lxdConfig, cephDisks) - if err != nil { - return err - } - - return postClusterSetup(false, s, peers, lxdConfig, cephDisks, uplinkNetworks, networkConfig) + return setupCluster(s, systems) } diff --git a/microcloud/cmd/microcloud/ask.go b/microcloud/cmd/microcloud/ask.go index e54e74fc9..78ddfc5a5 100644 --- a/microcloud/cmd/microcloud/ask.go +++ b/microcloud/cmd/microcloud/ask.go @@ -1,15 +1,12 @@ package main import ( - "bufio" "fmt" "net" - "os" "sort" "strings" "github.com/canonical/lxd/shared/api" - lxdAPI "github.com/canonical/lxd/shared/api" cli "github.com/canonical/lxd/shared/cmd" "github.com/canonical/lxd/shared/logger" "github.com/canonical/lxd/shared/units" @@ -21,8 +18,7 @@ import ( ) // askRetry will print all errors and re-attempt the given function on user input. -func askRetry(question string, autoSetup bool, f func() error) { - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) +func (c *CmdControl) askRetry(question string, autoSetup bool, f func() error) { for { retry := false err := f() @@ -30,7 +26,7 @@ func askRetry(question string, autoSetup bool, f func() error) { fmt.Println(err) if !autoSetup { - retry, err = asker.AskBool(fmt.Sprintf("%s (yes/no) [default=yes]: ", question), "yes") + retry, err = c.asker.AskBool(fmt.Sprintf("%s (yes/no) [default=yes]: ", question), "yes") if err != nil { fmt.Println(err) retry = false @@ -44,7 +40,7 @@ func askRetry(question string, autoSetup bool, f func() error) { } } -func askMissingServices(services []types.ServiceType, stateDirs map[types.ServiceType]string, autoSetup bool) ([]types.ServiceType, error) { +func (c *CmdControl) askMissingServices(services []types.ServiceType, stateDirs map[types.ServiceType]string, autoSetup bool) ([]types.ServiceType, error) { missingServices := []string{} for serviceType, stateDir := range stateDirs { if service.Exists(serviceType, stateDir) { @@ -57,8 +53,7 @@ func askMissingServices(services []types.ServiceType, stateDirs map[types.Servic if len(missingServices) > 0 { serviceStr := strings.Join(missingServices, ", ") if !autoSetup { - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) - confirm, err := asker.AskBool(fmt.Sprintf("%s not found. Continue anyway? (yes/no) [default=yes]: ", serviceStr), "yes") + confirm, err := c.asker.AskBool(fmt.Sprintf("%s not found. Continue anyway? (yes/no) [default=yes]: ", serviceStr), "yes") if err != nil { return nil, err } @@ -76,7 +71,7 @@ func askMissingServices(services []types.ServiceType, stateDirs map[types.Servic return services, nil } -func askAddress(autoSetup bool, listenAddr string) (string, *net.IPNet, error) { +func (c *CmdControl) askAddress(autoSetup bool, listenAddr string) (string, *net.IPNet, error) { info, err := mdns.GetNetworkInfo() if err != nil { return "", nil, fmt.Errorf("Failed to find network interfaces: %w", err) @@ -95,7 +90,7 @@ func askAddress(autoSetup bool, listenAddr string) (string, *net.IPNet, error) { } table := NewSelectableTable([]string{"ADDRESS", "IFACE"}, data) - askRetry("Retry selecting an address?", autoSetup, func() error { + c.askRetry("Retry selecting an address?", autoSetup, func() error { fmt.Println("Select an address for MicroCloud's internal traffic:") table.Render(table.rows) answers, err := table.GetSelections() @@ -133,8 +128,7 @@ func askAddress(autoSetup bool, listenAddr string) (string, *net.IPNet, error) { } if !autoSetup { - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) - filter, err := asker.AskBool(fmt.Sprintf("Limit search for other MicroCloud servers to %s? (yes/no) [default=yes]: ", subnet.String()), "yes") + filter, err := c.asker.AskBool(fmt.Sprintf("Limit search for other MicroCloud servers to %s? (yes/no) [default=yes]: ", subnet.String()), "yes") if err != nil { return "", nil, err } @@ -147,123 +141,97 @@ func askAddress(autoSetup bool, listenAddr string) (string, *net.IPNet, error) { return listenAddr, subnet, nil } -func askDisks(sh *service.Handler, peers map[string]mdns.ServerInfo, bootstrap bool, autoSetup bool, wipeAllDisks bool) (map[string][]lxdAPI.ClusterMemberConfigKey, map[string][]cephTypes.DisksPost, error) { - if bootstrap { - // Add the local system to the list of peers so we can select disks. - peers[sh.Name] = mdns.ServerInfo{Name: sh.Name} - defer delete(peers, sh.Name) - } - - allResources := make(map[string]*lxdAPI.Resources, len(peers)) +func (c *CmdControl) askDisks(sh *service.Handler, systems map[string]InitSystem, autoSetup bool, wipeAllDisks bool) error { + _, bootstrap := systems[sh.Name] + allResources := make(map[string]*api.Resources, len(systems)) var err error - for peer, info := range peers { - allResources[peer], err = sh.Services[types.LXD].(*service.LXDService).GetResources(peer, info.Address, info.AuthSecret) + for peer, system := range systems { + allResources[peer], err = sh.Services[types.LXD].(*service.LXDService).GetResources(peer, system.ServerInfo.Address, system.ServerInfo.AuthSecret) if err != nil { - return nil, nil, fmt.Errorf("Failed to get system resources of peer %q: %w", peer, err) + return fmt.Errorf("Failed to get system resources of peer %q: %w", peer, err) } } - validDisks := make(map[string][]lxdAPI.ResourcesStorageDisk, len(allResources)) + foundDisks := false for peer, r := range allResources { - validDisks[peer] = make([]lxdAPI.ResourcesStorageDisk, 0, len(r.Storage.Disks)) + system := systems[peer] + system.AvailableDisks = make([]api.ResourcesStorageDisk, 0, len(r.Storage.Disks)) for _, disk := range r.Storage.Disks { if len(disk.Partitions) == 0 { - validDisks[peer] = append(validDisks[peer], disk) + system.AvailableDisks = append(system.AvailableDisks, disk) } } + + if len(system.AvailableDisks) > 0 { + foundDisks = true + } + + systems[peer] = system } - var diskConfig map[string][]lxdAPI.ClusterMemberConfigKey - var reservedDisks map[string]string wantsDisks := true - - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) - if !autoSetup { - wantsDisks, err = asker.AskBool("Would you like to set up local storage? (yes/no) [default=yes]: ", "yes") + if !autoSetup && foundDisks { + wantsDisks, err = c.asker.AskBool("Would you like to set up local storage? (yes/no) [default=yes]: ", "yes") if err != nil { - return nil, nil, err + return err } } + if !foundDisks { + wantsDisks = false + } + lxd := sh.Services[types.LXD].(*service.LXDService) if wantsDisks { - askRetry("Retry selecting disks?", autoSetup, func() error { - diskConfig, reservedDisks, err = askLocalPool(validDisks, autoSetup, wipeAllDisks, *lxd) - - return err + c.askRetry("Retry selecting disks?", autoSetup, func() error { + return askLocalPool(systems, autoSetup, wipeAllDisks, *lxd) }) } - for peer, path := range reservedDisks { - fmt.Printf(" Using %q on %q for local storage pool\n", path, peer) - } - - if len(reservedDisks) > 0 { - // Add a space between the CLI and the response. - fmt.Println("") - } - - var cephDisks map[string][]cephTypes.DisksPost if sh.Services[types.MicroCeph] != nil { - availableDisks := map[string][]lxdAPI.ResourcesStorageDisk{} - for peer, disks := range validDisks { - peerDisks := []lxdAPI.ResourcesStorageDisk{} - for _, disk := range disks { - devicePath := fmt.Sprintf("/dev/%s", disk.ID) - if disk.DeviceID != "" { - devicePath = fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID) - } else if disk.DevicePath != "" { - devicePath = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath) - } - - if reservedDisks[peer] == devicePath { - continue - } - - peerDisks = append(peerDisks, disk) - } - - if len(peerDisks) > 0 { - availableDisks[peer] = peerDisks + availableDisks := map[string][]api.ResourcesStorageDisk{} + for peer, system := range systems { + if len(system.AvailableDisks) > 0 { + availableDisks[peer] = system.AvailableDisks } } - if len(availableDisks) < 3 { + if bootstrap && len(availableDisks) < 3 { fmt.Println("Insufficient number of disks available to set up distributed storage, skipping at this time") } else { - ceph := sh.Services[types.MicroCeph].(*service.CephService) wantsDisks = true if !autoSetup { - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) - wantsDisks, err = asker.AskBool("Would you like to set up distributed storage? (yes/no) [default=yes]: ", "yes") + wantsDisks, err = c.asker.AskBool("Would you like to set up distributed storage? (yes/no) [default=yes]: ", "yes") if err != nil { - return nil, nil, err + return err } - if len(peers) != len(availableDisks) && wantsDisks { - wantsDisks, err = asker.AskBool("Unable to find disks on some systems. Continue anyway? (yes/no) [default=yes]: ", "yes") + if len(systems) != len(availableDisks) && wantsDisks { + wantsDisks, err = c.asker.AskBool("Unable to find disks on some systems. Continue anyway? (yes/no) [default=yes]: ", "yes") if err != nil { - return nil, nil, err + return err } } } if wantsDisks { - askRetry("Retry selecting disks?", autoSetup, func() error { - cephDisks, err = askRemotePool(availableDisks, autoSetup, wipeAllDisks, *ceph) - - return err + c.askRetry("Retry selecting disks?", autoSetup, func() error { + return askRemotePool(systems, autoSetup, wipeAllDisks, sh) }) } else { // Add a space between the CLI and the response. fmt.Println("") } - for peer, disks := range cephDisks { - fmt.Printf(" Using %d disk(s) on %q for remote storage pool\n", len(disks), peer) + usingCeph := false + for peer, system := range systems { + if len(system.MicroCephDisks) > 0 { + usingCeph = true + fmt.Printf(" Using %d disk(s) on %q for remote storage pool\n", len(system.MicroCephDisks), peer) + } } - if len(cephDisks) > 0 { + if usingCeph { // Add a space between the CLI and the response. fmt.Println("") } @@ -271,40 +239,46 @@ func askDisks(sh *service.Handler, peers map[string]mdns.ServerInfo, bootstrap b } if !bootstrap { - sourceTemplate := lxdAPI.ClusterMemberConfigKey{ - Entity: "storage-pool", - Name: "remote", - Key: "source", - Value: "lxd_remote", - } + for peer, system := range systems { + if len(system.MicroCephDisks) > 0 { + if system.JoinConfig == nil { + system.JoinConfig = []api.ClusterMemberConfigKey{} + } + + system.JoinConfig = append(system.JoinConfig, lxd.DefaultCephStoragePoolJoinConfig()) - for peer := range cephDisks { - diskConfig[peer] = append(diskConfig[peer], sourceTemplate) + systems[peer] = system + } } } - return diskConfig, cephDisks, nil + return nil +} + +func parseDiskPath(disk api.ResourcesStorageDisk) string { + devicePath := fmt.Sprintf("/dev/%s", disk.ID) + if disk.DeviceID != "" { + devicePath = fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID) + } else if disk.DevicePath != "" { + devicePath = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath) + } + + return devicePath } -func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup bool, wipeAllDisks bool, lxd service.LXDService) (map[string][]lxdAPI.ClusterMemberConfigKey, map[string]string, error) { +func askLocalPool(systems map[string]InitSystem, autoSetup bool, wipeAllDisks bool, lxd service.LXDService) error { data := [][]string{} selected := map[string]string{} - for peer, disks := range peerDisks { + for peer, system := range systems { // If there's no spare disk, then we can't add a remote storage pool, so skip local pool creation. - if autoSetup && len(disks) < 2 { + if autoSetup && len(system.AvailableDisks) < 2 { logger.Infof("Skipping local storage pool creation, peer %q has too few disks", peer) - return nil, nil, nil + return nil } - for _, disk := range disks { - devicePath := fmt.Sprintf("/dev/%s", disk.ID) - if disk.DeviceID != "" { - devicePath = fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID) - } else if disk.DevicePath != "" { - devicePath = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath) - } - + for _, disk := range system.AvailableDisks { + devicePath := parseDiskPath(disk) data = append(data, []string{peer, disk.Model, units.GetByteSizeStringIEC(int64(disk.Size), 2), disk.Type, devicePath}) // Add the first disk for each peer. @@ -320,7 +294,7 @@ func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup toWipe := map[string]string{} wipeable, err := lxd.HasExtension(lxd.Name(), lxd.Address(), "", "storage_pool_source_wipe") if err != nil { - return nil, nil, fmt.Errorf("Failed to check for source.wipe extension: %w", err) + return fmt.Errorf("Failed to check for source.wipe extension: %w", err) } if !autoSetup { @@ -331,11 +305,11 @@ func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup table.Render(table.rows) selectedRows, err := table.GetSelections() if err != nil { - return nil, nil, fmt.Errorf("Failed to confirm local LXD disk selection: %w", err) + return fmt.Errorf("Failed to confirm local LXD disk selection: %w", err) } if len(selectedRows) == 0 { - return nil, nil, fmt.Errorf("No disks selected") + return fmt.Errorf("No disks selected") } for _, entry := range selectedRows { @@ -344,7 +318,7 @@ func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup _, ok := selected[target] if ok { - return nil, nil, fmt.Errorf("Failed to add local storage pool: Selected more than one disk for target peer %q", target) + return fmt.Errorf("Failed to add local storage pool: Selected more than one disk for target peer %q", target) } selected[target] = path @@ -355,7 +329,7 @@ func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup table.Render(selectedRows) wipeRows, err := table.GetSelections() if err != nil { - return nil, nil, fmt.Errorf("Failed to confirm which disks to wipe: %w", err) + return fmt.Errorf("Failed to confirm which disks to wipe: %w", err) } for _, entry := range wipeRows { @@ -367,68 +341,65 @@ func askLocalPool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup } if len(selected) == 0 { - return nil, nil, nil + return nil } - if len(selected) != len(peerDisks) { - return nil, nil, fmt.Errorf("Failed to add local storage pool: Some peers don't have an available disk") + if len(selected) != len(systems) { + return fmt.Errorf("Failed to add local storage pool: Some peers don't have an available disk") } if wipeAllDisks && wipeable { toWipe = selected } - wipeDisk := lxdAPI.ClusterMemberConfigKey{ - Entity: "storage-pool", - Name: "local", - Key: "source.wipe", - Value: "true", - } - - sourceTemplate := lxdAPI.ClusterMemberConfigKey{ - Entity: "storage-pool", - Name: "local", - Key: "source", - } - - memberConfig := make(map[string][]lxdAPI.ClusterMemberConfigKey, len(selected)) + _, bootstrap := systems[lxd.Name()] for target, path := range selected { - if target == lxd.Name() { - err := lxd.AddLocalPool(path, wipeable && toWipe[target] != "") - if err != nil { - return nil, nil, fmt.Errorf("Failed to add pending local storage pool on peer %q: %w", target, err) + system := systems[target] + if bootstrap { + system.TargetStoragePools = []api.StoragePoolsPost{lxd.DefaultPendingZFSStoragePool(wipeable && toWipe[target] != "", path)} + if target == lxd.Name() { + system.StoragePools = []api.StoragePoolsPost{lxd.DefaultZFSStoragePool()} } } else { - sourceTemplate.Value = path - memberConfig[target] = []lxdAPI.ClusterMemberConfigKey{sourceTemplate} - if toWipe[target] != "" { - memberConfig[target] = append(memberConfig[target], wipeDisk) + system.JoinConfig = lxd.DefaultZFSStoragePoolJoinConfig(wipeable && toWipe[target] != "", path) + } + + // Remove the disks that we selected. + remainingDisks := make([]api.ResourcesStorageDisk, 0, len(system.AvailableDisks)-1) + for _, disk := range system.AvailableDisks { + if parseDiskPath(disk) != path { + remainingDisks = append(remainingDisks, disk) } } + + system.AvailableDisks = remainingDisks + + systems[target] = system + + fmt.Printf(" Using %q on %q for local storage pool\n", path, target) + } + + if len(selected) > 0 { + // Add a space between the CLI and the response. + fmt.Println("") } - return memberConfig, selected, nil + return nil } -func askRemotePool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup bool, wipeAllDisks bool, ceph service.CephService) (map[string][]cephTypes.DisksPost, error) { +func askRemotePool(systems map[string]InitSystem, autoSetup bool, wipeAllDisks bool, sh *service.Handler) error { header := []string{"LOCATION", "MODEL", "CAPACITY", "TYPE", "PATH"} data := [][]string{} - for peer, disks := range peerDisks { - for _, disk := range disks { + for peer, system := range systems { + for _, disk := range system.AvailableDisks { // Skip any disks that have been reserved for the local storage pool. - devicePath := fmt.Sprintf("/dev/%s", disk.ID) - if disk.DeviceID != "" { - devicePath = fmt.Sprintf("/dev/disk/by-id/%s", disk.DeviceID) - } else if disk.DevicePath != "" { - devicePath = fmt.Sprintf("/dev/disk/by-path/%s", disk.DevicePath) - } - + devicePath := parseDiskPath(disk) data = append(data, []string{peer, disk.Model, units.GetByteSizeStringIEC(int64(disk.Size), 2), disk.Type, devicePath}) } } if len(data) == 0 { - return nil, fmt.Errorf("Found no available disks") + return fmt.Errorf("Found no available disks") } sort.Sort(cli.SortColumnsNaturally(data)) @@ -440,7 +411,7 @@ func askRemotePool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup } if len(table.rows) == 0 { - return nil, nil + return nil } if !autoSetup { @@ -449,7 +420,7 @@ func askRemotePool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup table.Render(table.rows) selected, err = table.GetSelections() if err != nil { - return nil, fmt.Errorf("Failed to confirm disk selection: %w", err) + return fmt.Errorf("Failed to confirm disk selection: %w", err) } if len(selected) > 0 && !wipeAllDisks { @@ -457,7 +428,7 @@ func askRemotePool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup table.Render(selected) toWipe, err = table.GetSelections() if err != nil { - return nil, fmt.Errorf("Failed to confirm disk wipe selection: %w", err) + return fmt.Errorf("Failed to confirm disk wipe selection: %w", err) } } } @@ -470,42 +441,91 @@ func askRemotePool(peerDisks map[string][]lxdAPI.ResourcesStorageDisk, autoSetup } } - diskMap := map[string][]cephTypes.DisksPost{} + diskCount := 0 + lxd := sh.Services[types.LXD].(*service.LXDService) for _, entry := range selected { target := table.SelectionValue(entry, "LOCATION") path := table.SelectionValue(entry, "PATH") + system := systems[target] - _, ok := diskMap[target] - if !ok { - diskMap[target] = []cephTypes.DisksPost{} + if system.MicroCephDisks == nil { + diskCount++ + system.MicroCephDisks = []cephTypes.DisksPost{} } - diskMap[target] = append(diskMap[target], cephTypes.DisksPost{Path: path, Wipe: wipeMap[entry]}) + system.MicroCephDisks = append(system.MicroCephDisks, cephTypes.DisksPost{Path: path, Wipe: wipeMap[entry]}) + + systems[target] = system + } + + if diskCount > 0 { + for target, system := range systems { + if system.TargetStoragePools == nil { + system.TargetStoragePools = []api.StoragePoolsPost{} + } + + _, bootstrap := systems[sh.Name] + if bootstrap { + system.TargetStoragePools = append(system.TargetStoragePools, lxd.DefaultPendingCephStoragePool()) + if target == sh.Name { + if system.StoragePools == nil { + system.StoragePools = []api.StoragePoolsPost{} + } + + system.StoragePools = append(system.StoragePools, lxd.DefaultCephStoragePool()) + } + } + + systems[target] = system + } } - _, checkMinSize := peerDisks[ceph.Name()] - if !checkMinSize || len(diskMap) >= 3 { - return diskMap, nil + _, checkMinSize := systems[sh.Name] + if !checkMinSize || diskCount >= 3 { + return nil } - return nil, fmt.Errorf("Unable to add remote storage pool: Each peer (minimum 3) must have allocated disks") + return fmt.Errorf("Unable to add remote storage pool: At least 3 peers must have allocated disks") } -func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig map[string][]api.ClusterMemberConfigKey, bootstrap bool, autoSetup bool) (map[string]string, map[string]string, error) { +func (c *CmdControl) askNetwork(sh *service.Handler, systems map[string]InitSystem, autoSetup bool) error { + _, bootstrap := systems[sh.Name] + lxd := sh.Services[types.LXD].(*service.LXDService) + for peer, system := range systems { + if bootstrap { + system.TargetNetworks = []api.NetworksPost{lxd.DefaultPendingFanNetwork()} + if peer == sh.Name { + network, err := lxd.DefaultFanNetwork() + if err != nil { + return err + } + + system.Networks = []api.NetworksPost{network} + } + } + + systems[peer] = system + } + // Automatic setup gets a basic fan setup. if autoSetup { - return nil, nil, nil + return nil } // Environments without OVN get a basic fan setup. if sh.Services[types.MicroOVN] == nil { - return nil, nil, nil + return nil } // Get the list of networks from all peers. - networks, err := sh.Services[types.LXD].(*service.LXDService).GetUplinkInterfaces(bootstrap, peers) + infos := []mdns.ServerInfo{} + for _, system := range systems { + infos = append(infos, system.ServerInfo) + } + + networks, err := sh.Services[types.LXD].(*service.LXDService).GetUplinkInterfaces(bootstrap, infos) if err != nil { - return nil, nil, err + return err } // Check if OVN is possible in the environment. @@ -519,25 +539,20 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig if !canOVN { fmt.Println("No dedicated uplink interfaces detected, skipping distributed networking") - return nil, nil, nil + return nil } // Ask the user if they want OVN. - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) - wantsOVN, err := asker.AskBool("Configure distributed networking? (yes/no) [default=yes]: ", "yes") + wantsOVN, err := c.asker.AskBool("Configure distributed networking? (yes/no) [default=yes]: ", "yes") if err != nil { - return nil, nil, err + return err } if !wantsOVN { - return nil, nil, nil - } - - missingSystems := len(peers) != len(networks) - if bootstrap { - missingSystems = len(peers) != len(networks)-1 + return nil } + missingSystems := len(systems) != len(networks) for _, nets := range networks { if len(nets) == 0 { missingSystems = true @@ -546,13 +561,13 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig } if missingSystems { - wantsSkip, err := asker.AskBool("Some systems are ineligible for distributed networking. Continue anyway? (yes/no) [default=yes]: ", "yes") + wantsSkip, err := c.asker.AskBool("Some systems are ineligible for distributed networking. Continue anyway? (yes/no) [default=yes]: ", "yes") if err != nil { - return nil, nil, err + return err } if !wantsSkip { - return nil, nil, nil + return nil } } @@ -568,7 +583,7 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig table := NewSelectableTable(header, data) var selected map[string]string - askRetry("Retry selecting uplink interfaces?", autoSetup, func() error { + c.askRetry("Retry selecting uplink interfaces?", autoSetup, func() error { table.Render(table.rows) answers, err := table.GetSelections() if err != nil { @@ -628,11 +643,10 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig return nil } - asker := cli.NewAsker(bufio.NewReader(os.Stdin)) msg := fmt.Sprintf("Specify the %s gateway (CIDR) on the uplink network (empty to skip %s): ", ip, ip) - gateway, err := asker.AskString(msg, "", validator) + gateway, err := c.asker.AskString(msg, "", validator) if err != nil { - return nil, nil, err + return err } if gateway != "" { @@ -654,14 +668,14 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig } if ip == "IPv4" { - rangeStart, err := asker.AskString(fmt.Sprintf("Specify the first %s address in the range to use with LXD: ", ip), "", validator) + rangeStart, err := c.asker.AskString(fmt.Sprintf("Specify the first %s address in the range to use with LXD: ", ip), "", validator) if err != nil { - return nil, nil, err + return err } - rangeEnd, err := asker.AskString(fmt.Sprintf("Specify the last %s address in the range to use with LXD: ", ip), "", validator) + rangeEnd, err := c.asker.AskString(fmt.Sprintf("Specify the last %s address in the range to use with LXD: ", ip), "", validator) if err != nil { - return nil, nil, err + return err } config[gateway] = fmt.Sprintf("%s-%s", rangeStart, rangeEnd) @@ -672,29 +686,56 @@ func askNetwork(sh *service.Handler, peers map[string]mdns.ServerInfo, lxdConfig } } - if !bootstrap { - if lxdConfig == nil { - lxdConfig = map[string][]lxdAPI.ClusterMemberConfigKey{} + // If interfaces were selected for OVN, remove the FAN config. + if len(selected) > 0 { + for peer, system := range systems { + system.TargetNetworks = []api.NetworksPost{} + system.Networks = []api.NetworksPost{} + + systems[peer] = system } + } - if len(selected) != 0 { - for peer, parent := range selected { - config, ok := lxdConfig[peer] - if !ok { - config = []api.ClusterMemberConfigKey{} - } + // If we are adding a new member, a MemberConfig entry should suffice to create the network on the node. + for peer, parent := range selected { + system := systems[peer] + if !bootstrap { + if system.JoinConfig == nil { + system.JoinConfig = []api.ClusterMemberConfigKey{} + } - config = append(config, api.ClusterMemberConfigKey{ - Entity: "network", - Name: "UPLINK", - Key: "parent", - Value: parent, - }) + system.JoinConfig = append(system.JoinConfig, lxd.DefaultOVNNetworkJoinConfig(parent)) + } else { + system.TargetNetworks = append(system.TargetNetworks, lxd.DefaultPendingOVNNetwork(parent)) + } - lxdConfig[peer] = config + systems[peer] = system + } + + if bootstrap { + bootstrapSystem := systems[sh.Name] + + var ipv4Gateway string + var ipv4Ranges string + var ipv6Gateway string + for gateway, ipRange := range config { + ip, _, err := net.ParseCIDR(gateway) + if err != nil { + return err + } + + if ip.To4() != nil { + ipv4Gateway = gateway + ipv4Ranges = ipRange + } else { + ipv6Gateway = gateway } } + + uplink, ovn := lxd.DefaultOVNNetwork(ipv4Gateway, ipv4Ranges, ipv6Gateway) + bootstrapSystem.Networks = []api.NetworksPost{uplink, ovn} + systems[sh.Name] = bootstrapSystem } - return selected, config, nil + return nil } diff --git a/microcloud/cmd/microcloud/main.go b/microcloud/cmd/microcloud/main.go index 7458b28f1..d7a954152 100644 --- a/microcloud/cmd/microcloud/main.go +++ b/microcloud/cmd/microcloud/main.go @@ -2,9 +2,11 @@ package main import ( + "bufio" "fmt" "os" + cli "github.com/canonical/lxd/shared/cmd" "github.com/spf13/cobra" "github.com/canonical/microcloud/microcloud/version" @@ -20,6 +22,8 @@ type CmdControl struct { FlagLogDebug bool FlagLogVerbose bool FlagMicroCloudDir string + + asker cli.Asker } func main() { @@ -30,7 +34,7 @@ func main() { } // common flags. - commonCmd := CmdControl{} + commonCmd := CmdControl{asker: cli.NewAsker(bufio.NewReader(os.Stdin))} app := &cobra.Command{ Use: "microcloud", diff --git a/microcloud/cmd/microcloud/main_init.go b/microcloud/cmd/microcloud/main_init.go index 253f2df05..20e498fda 100644 --- a/microcloud/cmd/microcloud/main_init.go +++ b/microcloud/cmd/microcloud/main_init.go @@ -10,10 +10,12 @@ import ( "time" "github.com/canonical/lxd/lxd/util" + "github.com/canonical/lxd/shared" lxdAPI "github.com/canonical/lxd/shared/api" "github.com/canonical/lxd/shared/logger" cephTypes "github.com/canonical/microceph/microceph/api/types" - "github.com/canonical/microceph/microceph/client" + cephClient "github.com/canonical/microceph/microceph/client" + "github.com/canonical/microcluster/client" ovnClient "github.com/canonical/microovn/microovn/client" "github.com/spf13/cobra" @@ -23,6 +25,22 @@ import ( "github.com/canonical/microcloud/microcloud/service" ) +// InitSystem represents the configuration passed to individual systems that join via the Handler. +type InitSystem struct { + ServerInfo mdns.ServerInfo // Data reported by mDNS about this system. + + AvailableDisks []lxdAPI.ResourcesStorageDisk // Disks as reported by LXD. + + MicroCephDisks []cephTypes.DisksPost // Disks intended to be passed to MicroCeph. + TargetNetworks []lxdAPI.NetworksPost // Target specific network configuration. + TargetStoragePools []lxdAPI.StoragePoolsPost // Target specific storage pool configuration. + Networks []lxdAPI.NetworksPost // Cluster-wide network configuration. + StoragePools []lxdAPI.StoragePoolsPost // Cluster-wide storage pool configuration. + StorageVolumes map[string][]lxdAPI.StorageVolumesPost // Cluster wide storage volume configuration. + + JoinConfig []lxdAPI.ClusterMemberConfigKey // LXD Config for joining members. +} + type cmdInit struct { common *CmdControl @@ -58,12 +76,14 @@ func (c *cmdInit) Run(cmd *cobra.Command, args []string) error { return err } - err = lxdService.Restart(30) + err = lxdService.Restart(context.Background(), 30) if err != nil { return err } - addr, subnet, err := askAddress(c.flagAutoSetup, c.flagAddress) + systems := map[string]InitSystem{} + + addr, subnet, err := c.common.askAddress(c.flagAutoSetup, c.flagAddress) if err != nil { return err } @@ -73,12 +93,11 @@ func (c *cmdInit) Run(cmd *cobra.Command, args []string) error { return fmt.Errorf("Failed to retrieve system hostname: %w", err) } - if !c.flagAutoSetup { //nolint:staticcheck - // FIXME: MicroCeph does not currently support non-hostname cluster names. - // name, err = cli.AskString(fmt.Sprintf("Specify a name for this system [default=%s]: ", name), name, nil) - // if err != nil { - // return err - // } + systems[name] = InitSystem{ + ServerInfo: mdns.ServerInfo{ + Name: name, + Address: addr, + }, } services := []types.ServiceType{types.MicroCloud, types.LXD} @@ -87,7 +106,7 @@ func (c *cmdInit) Run(cmd *cobra.Command, args []string) error { types.MicroOVN: api.MicroOVNDir, } - services, err = askMissingServices(services, optionalServices, c.flagAutoSetup) + services, err = c.common.askMissingServices(services, optionalServices, c.flagAutoSetup) if err != nil { return err } @@ -97,66 +116,30 @@ func (c *cmdInit) Run(cmd *cobra.Command, args []string) error { return err } - peers, err := lookupPeers(s, c.flagAutoSetup, subnet) - if err != nil { - return err - } - - lxdConfig, cephDisks, err := askDisks(s, peers, true, c.flagAutoSetup, c.flagWipeAllDisks) + err = lookupPeers(s, c.flagAutoSetup, subnet, systems) if err != nil { return err } - uplinkNetworks, networkConfig, err := askNetwork(s, peers, lxdConfig, true, c.flagAutoSetup) - if err != nil { - return err - } - - fmt.Println("Initializing a new cluster") - err = s.RunConcurrent(true, false, func(s service.Service) error { - err := s.Bootstrap() - if err != nil { - return fmt.Errorf("Failed to bootstrap local %s: %w", s.Type(), err) - } - - fmt.Printf(" Local %s is ready\n", s.Type()) - - return nil - }) + err = c.common.askDisks(s, systems, c.flagAutoSetup, c.flagWipeAllDisks) if err != nil { return err } - if len(cephDisks) > 0 { - c, err := s.Services[types.MicroCeph].(*service.CephService).Client("", "") - if err != nil { - return err - } - - for _, disk := range cephDisks[s.Name] { - err = client.AddDisk(context.Background(), c, &disk) - if err != nil { - return err - } - } - } - - err = AddPeers(s, peers, lxdConfig, cephDisks) + err = c.common.askNetwork(s, systems, c.flagAutoSetup) if err != nil { return err } - err = postClusterSetup(true, s, peers, lxdConfig, cephDisks, uplinkNetworks, networkConfig) + err = setupCluster(s, systems) if err != nil { return err } - fmt.Println("MicroCloud is ready") - return nil } -func lookupPeers(s *service.Handler, autoSetup bool, subnet *net.IPNet) (map[string]mdns.ServerInfo, error) { +func lookupPeers(s *service.Handler, autoSetup bool, subnet *net.IPNet, systems map[string]InitSystem) error { header := []string{"NAME", "IFACE", "ADDR"} var table *SelectableTable var answers []string @@ -190,14 +173,14 @@ func lookupPeers(s *service.Handler, autoSetup bool, subnet *net.IPNet) (map[str done = true case err := <-selectionCh: if err != nil { - return nil, err + return err } done = true default: peers, err := mdns.LookupPeers(context.Background(), mdns.Version, s.Name) if err != nil { - return nil, err + return err } skipPeers := map[string]bool{} @@ -242,56 +225,64 @@ func lookupPeers(s *service.Handler, autoSetup bool, subnet *net.IPNet) (map[str } if len(totalPeers) == 0 { - return nil, fmt.Errorf("Found no available systems") + return fmt.Errorf("Found no available systems") } - selectedPeers := map[string]mdns.ServerInfo{} for _, answer := range answers { peer := table.SelectionValue(answer, "NAME") addr := table.SelectionValue(answer, "ADDR") iface := table.SelectionValue(answer, "IFACE") for _, info := range totalPeers { if info.Name == peer && info.Address == addr && info.Interface == iface { - selectedPeers[peer] = info + systems[peer] = InitSystem{ + ServerInfo: info, + } } } } if autoSetup { for _, info := range totalPeers { - selectedPeers[info.Name] = info + systems[info.Name] = InitSystem{ + ServerInfo: info, + } } // Add a space between the CLI and the response. fmt.Println("") } - for _, info := range selectedPeers { - fmt.Printf(" Selected %q at %q\n", info.Name, info.Address) + for _, info := range systems { + fmt.Printf(" Selected %q at %q\n", info.ServerInfo.Name, info.ServerInfo.Address) } // Add a space between the CLI and the response. fmt.Println("") - return selectedPeers, nil + + return nil } -func AddPeers(sh *service.Handler, peers map[string]mdns.ServerInfo, localDisks map[string][]lxdAPI.ClusterMemberConfigKey, cephDisks map[string][]cephTypes.DisksPost) error { - joinConfig := make(map[string]types.ServicesPut, len(peers)) - secrets := make(map[string]string, len(peers)) - for peer, info := range peers { +func AddPeers(sh *service.Handler, systems map[string]InitSystem) error { + joinConfig := make(map[string]types.ServicesPut, len(systems)) + secrets := make(map[string]string, len(systems)) + for peer, info := range systems { + if peer == sh.Name { + continue + } + joinConfig[peer] = types.ServicesPut{ Tokens: []types.ServiceToken{}, - Address: info.Address, - LXDConfig: localDisks[peer], - CephConfig: cephDisks[peer], + Address: info.ServerInfo.Address, + LXDConfig: info.JoinConfig, + CephConfig: info.MicroCephDisks, } - secrets[peer] = info.AuthSecret + secrets[peer] = info.ServerInfo.AuthSecret } mut := sync.Mutex{} err := sh.RunConcurrent(false, false, func(s service.Service) error { - for peer := range peers { + for peer := range joinConfig { token, err := s.IssueToken(peer) if err != nil { return fmt.Errorf("Failed to issue %s token for peer %q: %w", s.Type(), peer, err) @@ -351,7 +342,7 @@ func AddPeers(sh *service.Handler, peers map[string]mdns.ServerInfo, localDisks return fmt.Errorf("Missing MicroCloud service") } - cloudCluster, err := cloudService.ClusterMembers() + cloudCluster, err := cloudService.ClusterMembers(context.Background()) if err != nil { return fmt.Errorf("Failed to get %s service cluster members: %w", cloudService.Type(), err) } @@ -361,7 +352,7 @@ func AddPeers(sh *service.Handler, peers map[string]mdns.ServerInfo, localDisks return nil } - cluster, err := s.ClusterMembers() + cluster, err := s.ClusterMembers(context.Background()) if err != nil { return fmt.Errorf("Failed to get %s service cluster members: %w", s.Type(), err) } @@ -412,7 +403,6 @@ func waitForCluster(sh *service.Handler, secrets map[string]string, peers map[st fmt.Printf(" Peer %q has joined the cluster\n", entry.Name) delete(peers, entry.Name) - if len(peers) == 0 { close(joinedChan) @@ -422,22 +412,52 @@ func waitForCluster(sh *service.Handler, secrets map[string]string, peers map[st } } -func postClusterSetup(bootstrap bool, sh *service.Handler, peers map[string]mdns.ServerInfo, lxdDisks map[string][]lxdAPI.ClusterMemberConfigKey, cephDisks map[string][]cephTypes.DisksPost, uplinkNetworks map[string]string, networkConfig map[string]string) error { - cephTargets := map[string]string{} - if len(cephDisks) > 0 { - for target := range peers { - cephTargets[target] = peers[target].AuthSecret +// setupCluster Bootstraps the cluster if necessary, adds all peers to the cluster, and completes any post cluster +// configuration. +func setupCluster(s *service.Handler, systems map[string]InitSystem) error { + _, bootstrap := systems[s.Name] + if bootstrap { + fmt.Println("Initializing a new cluster") + err := s.RunConcurrent(true, false, func(s service.Service) error { + err := s.Bootstrap(context.Background()) + if err != nil { + return fmt.Errorf("Failed to bootstrap local %s: %w", s.Type(), err) + } + + fmt.Printf(" Local %s is ready\n", s.Type()) + + return nil + }) + if err != nil { + return err } - if bootstrap { - cephTargets[sh.Name] = "" + // Only add disks for the local MicroCeph as other systems will add their disks upon joining. + var c *client.Client + for _, disk := range systems[s.Name].MicroCephDisks { + if c == nil { + c, err = s.Services[types.MicroCeph].(*service.CephService).Client("", "") + if err != nil { + return err + } + } + + logger.Debug("Adding disk to MicroCeph", logger.Ctx{"peer": s.Name, "disk": disk.Path}) + err = cephClient.AddDisk(context.Background(), c, &disk) + if err != nil { + return err + } } } - networkTargets := map[string]string{} + err := AddPeers(s, systems) + if err != nil { + return err + } + var ovnConfig string - if sh.Services[types.MicroOVN] != nil { - ovn := sh.Services[types.MicroOVN].(*service.OVNService) + if s.Services[types.MicroOVN] != nil { + ovn := s.Services[types.MicroOVN].(*service.OVNService) c, err := ovn.Client() if err != nil { return err @@ -448,12 +468,25 @@ func postClusterSetup(bootstrap bool, sh *service.Handler, peers map[string]mdns return err } + clusterMap := map[string]string{} + if bootstrap { + for peer, system := range systems { + clusterMap[peer] = system.ServerInfo.Address + } + } else { + cloud := s.Services[types.MicroCloud].(*service.CloudService) + clusterMap, err = cloud.ClusterMembers(context.Background()) + if err != nil { + return err + } + } + conns := []string{} for _, service := range services { if service.Service == "central" { - addr := sh.Address - if service.Location != sh.Name { - addr = peers[service.Location].Address + addr := s.Address + if service.Location != s.Name { + addr = clusterMap[service.Location] } conns = append(conns, fmt.Sprintf("ssl:%s", util.CanonicalNetworkAddress(addr, 6641))) @@ -463,14 +496,160 @@ func postClusterSetup(bootstrap bool, sh *service.Handler, peers map[string]mdns ovnConfig = strings.Join(conns, ",") } - for peer, info := range peers { - networkTargets[peer] = info.AuthSecret + config := map[string]string{"network.ovn.northbound_connection": ovnConfig} + lxd := s.Services[types.LXD].(*service.LXDService) + lxdClient, err := lxd.Client("") + if err != nil { + return err } - lxdTargets := map[string]string{} - for peer := range lxdDisks { - lxdTargets[peer] = peers[peer].AuthSecret + // Update LXD's global config. + server, _, err := lxdClient.GetServer() + if err != nil { + return err } - return sh.Services[types.LXD].(*service.LXDService).Configure(bootstrap, lxdTargets, cephTargets, ovnConfig, networkTargets, uplinkNetworks, networkConfig) + newServer := server.Writable() + changed := false + for k, v := range config { + if newServer.Config[k] != v { + changed = true + } + + newServer.Config[k] = v + } + + if changed { + err = lxdClient.UpdateServer(newServer, "") + if err != nil { + return err + } + } + + // Create preliminary networks & storage pools on each target. + for name, system := range systems { + lxdClient, err := lxd.Client(system.ServerInfo.AuthSecret) + if err != nil { + return err + } + + targetClient := lxdClient.UseTarget(name) + for _, network := range system.TargetNetworks { + err = targetClient.CreateNetwork(network) + if err != nil { + return err + } + } + + for _, pool := range system.TargetStoragePools { + err = targetClient.CreateStoragePool(pool) + if err != nil { + return err + } + } + } + + // If bootstrapping, finalize setup of storage pools & networks, and update the default profile accordingly. + system, bootstrap := systems[s.Name] + if bootstrap { + lxd := s.Services[types.LXD].(*service.LXDService) + lxdClient, err := lxd.Client(system.ServerInfo.AuthSecret) + if err != nil { + return err + } + + profile := lxdAPI.ProfilesPost{ProfilePut: lxdAPI.ProfilePut{Devices: map[string]map[string]string{}}, Name: "default"} + + for _, network := range system.Networks { + if network.Name == "default" || profile.Devices["eth0"] == nil { + profile.Devices["eth0"] = map[string]string{"name": "eth0", "network": network.Name, "type": "nic"} + } + + err = lxdClient.CreateNetwork(network) + if err != nil { + return err + } + } + + for _, pool := range system.StoragePools { + if pool.Driver == "ceph" || profile.Devices["root"] == nil { + profile.Devices["root"] = map[string]string{"path": "/", "pool": pool.Name, "type": "disk"} + } + + err = lxdClient.CreateStoragePool(pool) + if err != nil { + return err + } + } + + profiles, err := lxdClient.GetProfileNames() + if err != nil { + return err + } + + if !shared.StringInSlice(profile.Name, profiles) { + err = lxdClient.CreateProfile(profile) + } else { + err = lxdClient.UpdateProfile(profile.Name, profile.ProfilePut, "") + } + + if err != nil { + return err + } + } + + // With storage pools set up, add some volumes for images & backups. + for name, system := range systems { + lxdClient, err := lxd.Client(system.ServerInfo.AuthSecret) + if err != nil { + return err + } + + poolNames := []string{} + if bootstrap { + for _, pool := range system.TargetStoragePools { + poolNames = append(poolNames, pool.Name) + } + } else { + for _, cfg := range system.JoinConfig { + if cfg.Name == "local" || cfg.Name == "remote" { + if cfg.Entity == "storage-pool" && cfg.Key == "source" { + poolNames = append(poolNames, cfg.Name) + } + } + } + } + + targetClient := lxdClient.UseTarget(name) + for _, pool := range poolNames { + if pool == "local" { + err = targetClient.CreateStoragePoolVolume("local", lxdAPI.StorageVolumesPost{Name: "images", Type: "custom"}) + if err != nil { + return err + } + + err = targetClient.CreateStoragePoolVolume("local", lxdAPI.StorageVolumesPost{Name: "backups", Type: "custom"}) + if err != nil { + return err + } + + server, _, err := targetClient.GetServer() + if err != nil { + return err + } + + newServer := server.Writable() + newServer.Config["storage.backups_volume"] = "local/backups" + newServer.Config["storage.images_volume"] = "local/images" + err = targetClient.UpdateServer(newServer, "") + if err != nil { + return err + } + } + } + } + + fmt.Println("MicroCloud is ready") + + return nil } diff --git a/microcloud/cmd/microcloudd/main.go b/microcloud/cmd/microcloudd/main.go index 5657a0cce..f878f1eda 100644 --- a/microcloud/cmd/microcloudd/main.go +++ b/microcloud/cmd/microcloudd/main.go @@ -94,11 +94,11 @@ func (c *cmdDaemon) Run(cmd *cobra.Command, args []string) error { updated := false for serviceName, stateDir := range optionalServices { - if s.Services[serviceName] != nil { - continue - } - if service.Exists(serviceName, stateDir) { + if s.Services[serviceName] != nil { + continue + } + newService, err := service.NewHandler(name, addr, c.flagMicroCloudDir, false, false, serviceName) if err != nil { logger.Error("Failed to create servie handler for service", logger.Ctx{"service": serviceName, "error": err}) @@ -107,6 +107,9 @@ func (c *cmdDaemon) Run(cmd *cobra.Command, args []string) error { updated = true s.Services[serviceName] = newService.Services[serviceName] + } else if s.Services[serviceName] != nil { + delete(s.Services, serviceName) + updated = true } } diff --git a/microcloud/service/interface.go b/microcloud/service/interface.go index 3c418f744..94f441de3 100644 --- a/microcloud/service/interface.go +++ b/microcloud/service/interface.go @@ -1,15 +1,17 @@ package service import ( + "context" + "github.com/canonical/microcloud/microcloud/api/types" ) // Service represents a common interface for all MicroCloud services. type Service interface { - Bootstrap() error + Bootstrap(ctx context.Context) error IssueToken(peer string) (string, error) - Join(config JoinConfig) error - ClusterMembers() (map[string]string, error) + Join(ctx context.Context, config JoinConfig) error + ClusterMembers(ctx context.Context) (map[string]string, error) Type() types.ServiceType Name() string diff --git a/microcloud/service/lxd.go b/microcloud/service/lxd.go index 58f461412..c22fb079b 100644 --- a/microcloud/service/lxd.go +++ b/microcloud/service/lxd.go @@ -46,8 +46,9 @@ func NewLXDService(ctx context.Context, name string, addr string, cloudDir strin }, nil } -// client returns a client to the LXD unix socket. -func (s LXDService) client(secret string) (lxd.InstanceServer, error) { +// Client returns a client to the LXD unix socket. +// The secret should be specified when the request is going to be forwarded to a remote address, such as with UseTarget. +func (s LXDService) Client(secret string) (lxd.InstanceServer, error) { c, err := s.m.LocalClient() if err != nil { return nil, err @@ -96,8 +97,8 @@ func (s LXDService) remoteClient(secret string, address string, port int) (lxd.I } // Bootstrap bootstraps the LXD daemon on the default port. -func (s LXDService) Bootstrap() error { - client, err := s.client("") +func (s LXDService) Bootstrap(ctx context.Context) error { + client, err := s.Client("") if err != nil { return err } @@ -147,12 +148,30 @@ func (s LXDService) Bootstrap() error { return fmt.Errorf("Failed to initialize cluster: %w", err) } - return nil + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return fmt.Errorf("Timed out waiting for LXD cluster to initialize") + default: + names, err := client.GetClusterMemberNames() + if err != nil { + return err + } + + if len(names) > 0 { + return nil + } + + time.Sleep(time.Second) + } + } } // Join joins a cluster with the given token. -func (s LXDService) Join(joinConfig JoinConfig) error { - err := s.Restart(30) +func (s LXDService) Join(ctx context.Context, joinConfig JoinConfig) error { + err := s.Restart(ctx, 30) if err != nil { return err } @@ -163,7 +182,7 @@ func (s LXDService) Join(joinConfig JoinConfig) error { } config.Cluster.MemberConfig = joinConfig.LXDConfig - client, err := s.client("") + client, err := s.Client("") if err != nil { return err } @@ -183,7 +202,7 @@ func (s LXDService) Join(joinConfig JoinConfig) error { // IssueToken issues a token for the given peer. func (s LXDService) IssueToken(peer string) (string, error) { - client, err := s.client("") + client, err := s.Client("") if err != nil { return "", err } @@ -203,8 +222,8 @@ func (s LXDService) IssueToken(peer string) (string, error) { } // ClusterMembers returns a map of cluster member names and addresses. -func (s LXDService) ClusterMembers() (map[string]string, error) { - client, err := s.client("") +func (s LXDService) ClusterMembers(ctx context.Context) (map[string]string, error) { + client, err := s.Client("") if err != nil { return nil, err } @@ -242,100 +261,12 @@ func (s LXDService) Port() int { return s.port } -// AddLocalPool adds local zfs storage pool on the target peers, with the given source disks. -func (s *LXDService) AddLocalPool(source string, wipe bool) error { - c, err := s.client("") - if err != nil { - return err - } - - config := map[string]string{"source": source} - if wipe { - config["source.wipe"] = "true" - } - - return c.CreateStoragePool(api.StoragePoolsPost{ - Name: "local", - Driver: "zfs", - StoragePoolPut: api.StoragePoolPut{ - Config: config, - Description: "Local storage on ZFS", - }, - }) -} - -// AddLocalVolumes creates the default local storage volumes for a new LXD service. -func (s *LXDService) AddLocalVolumes(target string, secret string) error { - c, err := s.client(secret) - if err != nil { - return err - } - - if s.Name() != target { - c = c.UseTarget(target) - } - - err = c.CreateStoragePoolVolume("local", api.StorageVolumesPost{Name: "images", Type: "custom"}) - if err != nil { - return err - } - - err = c.CreateStoragePoolVolume("local", api.StorageVolumesPost{Name: "backups", Type: "custom"}) - if err != nil { - return err - } - - server, _, err := c.GetServer() - if err != nil { - return err - } - - newServer := server.Writable() - newServer.Config["storage.backups_volume"] = "local/backups" - newServer.Config["storage.images_volume"] = "local/images" - err = c.UpdateServer(newServer, "") - if err != nil { - return err - } - - return nil -} - -// AddRemotePools adds pending Ceph storage pools for each of the target peers. -func (s *LXDService) AddRemotePools(targets map[string]string) error { - if len(targets) == 0 { - return nil - } - - for target, secret := range targets { - c, err := s.client(secret) - if err != nil { - return err - } - - err = c.UseTarget(target).CreateStoragePool(api.StoragePoolsPost{ - Name: "remote", - Driver: "ceph", - StoragePoolPut: api.StoragePoolPut{ - Config: map[string]string{ - "source": "lxd_remote", - }, - }, - }) - if err != nil { - return err - } - } - - return nil -} - // HasExtension checks if the server supports the API extension. func (s *LXDService) HasExtension(target string, address string, secret string, apiExtension string) (bool, error) { var err error var client lxd.InstanceServer if s.Name() == target { - client, err = s.client(secret) + client, err = s.Client(secret) if err != nil { return false, err } @@ -356,7 +287,7 @@ func (s *LXDService) GetResources(target string, address string, secret string) var err error var client lxd.InstanceServer if s.Name() == target { - client, err = s.client(secret) + client, err = s.Client(secret) if err != nil { return nil, err } @@ -371,11 +302,11 @@ func (s *LXDService) GetResources(target string, address string, secret string) } // GetUplinkInterfaces returns a map of peer name to slice of api.Network that may be used with OVN. -func (s LXDService) GetUplinkInterfaces(bootstrap bool, peers map[string]mdns.ServerInfo) (map[string][]api.Network, error) { +func (s LXDService) GetUplinkInterfaces(bootstrap bool, peers []mdns.ServerInfo) (map[string][]api.Network, error) { clients := map[string]lxd.InstanceServer{} networks := map[string][]api.Network{} if bootstrap { - client, err := s.client("") + client, err := s.Client("") if err != nil { return nil, err } @@ -389,6 +320,11 @@ func (s LXDService) GetUplinkInterfaces(bootstrap bool, peers map[string]mdns.Se } for _, info := range peers { + // Don't include a local interface unless we are bootstrapping, in which case we shouldn't use the remote client. + if info.Name == s.Name() { + continue + } + client, err := s.remoteClient(info.AuthSecret, info.Address, CloudPort) if err != nil { return nil, err @@ -454,239 +390,6 @@ func (s LXDService) GetUplinkInterfaces(bootstrap bool, peers map[string]mdns.Se return candidates, nil } -// SetupNetwork configures LXD to use the OVN network uplink or to use a fan overlay if this is not available. -func (s LXDService) SetupNetwork(uplinkNetworks map[string]string, networkConfig map[string]string) error { - client, err := s.client("") - if err != nil { - return err - } - - if uplinkNetworks[s.Name()] == "" { - err = client.UseTarget(s.Name()).CreateNetwork(api.NetworksPost{Name: "lxdfan0", Type: "bridge"}) - if err != nil { - return err - } - - // Setup networking. - underlay, _, err := defaultGatewaySubnetV4() - if err != nil { - return fmt.Errorf("Couldn't determine Fan overlay subnet: %w", err) - } - - underlaySize, _ := underlay.Mask.Size() - if underlaySize != 16 && underlaySize != 24 { - // Override to /16 as that will almost always lead to working Fan network. - underlay.Mask = net.CIDRMask(16, 32) - underlay.IP = underlay.IP.Mask(underlay.Mask) - } - - network := api.NetworksPost{ - NetworkPut: api.NetworkPut{ - Config: map[string]string{ - "bridge.mode": "fan", - "fan.underlay_subnet": underlay.String(), - }, - Description: "Default Ubuntu fan powered bridge", - }, - Name: "lxdfan0", - Type: "bridge", - } - - err = client.CreateNetwork(network) - if err != nil { - return err - } - } else { - err = client.UseTarget(s.Name()).CreateNetwork(api.NetworksPost{ - NetworkPut: api.NetworkPut{Config: map[string]string{"parent": uplinkNetworks[s.Name()]}}, - Name: "UPLINK", - Type: "physical", - }) - if err != nil { - return err - } - - network := api.NetworkPut{Config: map[string]string{}, Description: "Uplink for OVN networks"} - for gateway, ipRange := range networkConfig { - ip, _, err := net.ParseCIDR(gateway) - if err != nil { - return err - } - - if ip.To4() != nil { - network.Config["ipv4.gateway"] = gateway - network.Config["ipv4.ovn.ranges"] = ipRange - } else { - network.Config["ipv6.gateway"] = gateway - } - } - - err = client.CreateNetwork(api.NetworksPost{ - NetworkPut: network, - Name: "UPLINK", - Type: "physical", - }) - if err != nil { - return err - } - - err = client.CreateNetwork(api.NetworksPost{ - NetworkPut: api.NetworkPut{Config: map[string]string{"network": "UPLINK"}, Description: "Default OVN network"}, - Name: "default", - Type: "ovn", - }) - if err != nil { - return err - } - } - - return nil -} - -// Configure sets up the LXD storage pool (either remote ceph or local zfs), and adds the root and network devices to -// the default profile. -func (s *LXDService) Configure(bootstrap bool, localPoolTargets map[string]string, remotePoolTargets map[string]string, ovnConfig string, networkTargets map[string]string, uplinkNetworks map[string]string, networkConfig map[string]string) error { - c, err := s.client("") - if err != nil { - return err - } - - for peer, secret := range localPoolTargets { - err = s.AddLocalVolumes(peer, secret) - if err != nil { - return err - } - } - - if bootstrap { - err = s.SetConfig(s.Name(), "", map[string]string{"network.ovn.northbound_connection": ovnConfig}) - if err != nil { - return err - } - - for peer, secret := range networkTargets { - err = s.SetConfig(peer, secret, map[string]string{"network.ovn.northbound_connection": ovnConfig}) - if err != nil { - return err - } - - if uplinkNetworks[peer] != "" { - client, err := s.client(secret) - if err != nil { - return err - } - - err = client.UseTarget(peer).CreateNetwork(api.NetworksPost{ - NetworkPut: api.NetworkPut{Config: map[string]string{"parent": uplinkNetworks[peer]}}, - Name: "UPLINK", - Type: "physical", - }) - if err != nil { - return err - } - } else { - client, err := s.client(secret) - if err != nil { - return err - } - - err = client.UseTarget(peer).CreateNetwork(api.NetworksPost{Name: "lxdfan0", Type: "bridge"}) - if err != nil { - return err - } - } - } - - err = s.SetupNetwork(uplinkNetworks, networkConfig) - if err != nil { - return err - } - - profile := api.ProfilesPost{ProfilePut: api.ProfilePut{Devices: map[string]map[string]string{}}, Name: "default"} - if uplinkNetworks[s.Name()] != "" { - profile.Devices["eth0"] = map[string]string{"name": "eth0", "network": "default", "type": "nic"} - } else { - profile.Devices["eth0"] = map[string]string{"name": "eth0", "network": "lxdfan0", "type": "nic"} - } - - if len(localPoolTargets) > 0 { - err = s.AddLocalVolumes(s.Name(), "") - if err != nil { - return err - } - - profile.Devices["root"] = map[string]string{"path": "/", "pool": "local", "type": "disk"} - } - - err = s.AddRemotePools(remotePoolTargets) - if err != nil { - return err - } - - if len(remotePoolTargets) > 0 { - storage := api.StoragePoolsPost{ - Name: "remote", - Driver: "ceph", - StoragePoolPut: api.StoragePoolPut{ - Config: map[string]string{ - "ceph.rbd.du": "false", - "ceph.rbd.features": "layering,striping,exclusive-lock,object-map,fast-diff,deep-flatten", - }, - Description: "Distributed storage on Ceph", - }, - } - - err = c.CreateStoragePool(storage) - if err != nil { - return err - } - - profile.Devices["root"] = map[string]string{"path": "/", "pool": "remote", "type": "disk"} - } - - profiles, err := c.GetProfileNames() - if err != nil { - return err - } - - if !shared.StringInSlice(profile.Name, profiles) { - err = c.CreateProfile(profile) - } else { - err = c.UpdateProfile(profile.Name, profile.ProfilePut, "") - } - - if err != nil { - return err - } - } - - return nil -} - -// SetConfig applies the new config key/value pair to the given target. -func (s *LXDService) SetConfig(target string, secret string, config map[string]string) error { - c, err := s.client(secret) - if err != nil { - return err - } - - if s.Name() != target { - c = c.UseTarget(target) - } - - server, _, err := c.GetServer() - if err != nil { - return err - } - - newServer := server.Writable() - for k, v := range config { - newServer.Config[k] = v - } - - return c.UpdateServer(newServer, "") -} - // isInitialized checks if LXD is initialized by fetching the storage pools. // If none exist, that means LXD has not yet been set up. func (s *LXDService) isInitialized(c lxd.InstanceServer) (bool, error) { @@ -699,8 +402,8 @@ func (s *LXDService) isInitialized(c lxd.InstanceServer) (bool, error) { } // Restart requests LXD to shutdown, then waits until it is ready. -func (s *LXDService) Restart(timeoutSeconds int) error { - c, err := s.client("") +func (s *LXDService) Restart(ctx context.Context, timeoutSeconds int) error { + c, err := s.Client("") if err != nil { return err } @@ -715,11 +418,11 @@ func (s *LXDService) Restart(timeoutSeconds int) error { } _, _, err = c.RawQuery("PUT", "/internal/shutdown", nil, "") - if err != nil { + if err != nil && err.Error() != "Shutdown already in progress" { return fmt.Errorf("Failed to send shutdown request to LXD: %w", err) } - err = s.waitReady(c, timeoutSeconds) + err = s.waitReady(ctx, c, timeoutSeconds) if err != nil { return err } @@ -734,7 +437,7 @@ func (s *LXDService) Restart(timeoutSeconds int) error { } // waitReady repeatedly (500ms intervals) asks LXD if it is ready, up to the given timeout. -func (s *LXDService) waitReady(c lxd.InstanceServer, timeoutSeconds int) error { +func (s *LXDService) waitReady(ctx context.Context, c lxd.InstanceServer, timeoutSeconds int) error { finger := make(chan error, 1) var errLast error go func() { @@ -769,11 +472,14 @@ func (s *LXDService) waitReady(c lxd.InstanceServer, timeoutSeconds int) error { } }() + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + if timeoutSeconds > 0 { select { case <-finger: break - case <-time.After(time.Second * time.Duration(timeoutSeconds)): + case <-ctx.Done(): return fmt.Errorf("LXD is still not running after %ds timeout (%v)", timeoutSeconds, errLast) } } else { diff --git a/microcloud/service/lxd_config.go b/microcloud/service/lxd_config.go new file mode 100644 index 000000000..0f4d013f7 --- /dev/null +++ b/microcloud/service/lxd_config.go @@ -0,0 +1,180 @@ +package service + +import ( + "fmt" + "net" + "strconv" + + "github.com/canonical/lxd/shared/api" +) + +// DefaultPendingFanNetwork returns the default Ubuntu Fan network configuration when +// creating a pending network on a specific cluster member target. +func (s LXDService) DefaultPendingFanNetwork() api.NetworksPost { + return api.NetworksPost{Name: "lxdfan0", Type: "bridge"} +} + +// DefaultFanNetwork returns the default Ubuntu Fan network configuration when +// creating the finalized network. +func (s LXDService) DefaultFanNetwork() (api.NetworksPost, error) { + underlay, _, err := defaultGatewaySubnetV4() + if err != nil { + return api.NetworksPost{}, fmt.Errorf("Could not determine Fan overlay subnet: %w", err) + } + + underlaySize, _ := underlay.Mask.Size() + if underlaySize != 16 && underlaySize != 24 { + // Override to /16 as that will almost always lead to working Fan network. + underlay.Mask = net.CIDRMask(16, 32) + underlay.IP = underlay.IP.Mask(underlay.Mask) + } + + return api.NetworksPost{ + NetworkPut: api.NetworkPut{ + Config: map[string]string{ + "bridge.mode": "fan", + "fan.underlay_subnet": underlay.String(), + }, + Description: "Default Ubuntu fan powered bridge", + }, + Name: "lxdfan0", + Type: "bridge", + }, nil +} + +// DefaultPendingOVNNetwork returns the default OVN uplink network configuration when +// creating a pending network on a specific cluster member target. +func (s LXDService) DefaultPendingOVNNetwork(parent string) api.NetworksPost { + return api.NetworksPost{ + NetworkPut: api.NetworkPut{Config: map[string]string{"parent": parent}}, + Name: "UPLINK", + Type: "physical", + } +} + +// DefaultOVNNetworkJoinConfig returns the default OVN uplink network configuration when +// joining an existing cluster. +func (s LXDService) DefaultOVNNetworkJoinConfig(parent string) api.ClusterMemberConfigKey { + return api.ClusterMemberConfigKey{ + Entity: "network", + Name: "UPLINK", + Key: "parent", + Value: parent, + } +} + +// DefaultOVNNetwork returns the default OVN network configuration when +// creating the finalized network. +// Returns both the finalized uplink configuration as the first argument, +// and the default OVN network configuration as the second argument. +func (s LXDService) DefaultOVNNetwork(ipv4Gateway string, ipv4Range string, ipv6Gateway string) (api.NetworksPost, api.NetworksPost) { + finalUplinkCfg := api.NetworksPost{ + NetworkPut: api.NetworkPut{ + Config: map[string]string{}, + Description: "Uplink for OVN networks"}, + Name: "UPLINK", + Type: "physical", + } + + if ipv4Gateway != "" && ipv4Range != "" { + finalUplinkCfg.Config["ipv4.gateway"] = ipv4Gateway + finalUplinkCfg.Config["ipv4.ovn.ranges"] = ipv4Range + } + + if ipv6Gateway != "" { + finalUplinkCfg.Config["ipv6.gateway"] = ipv6Gateway + } + + ovnNetwork := api.NetworksPost{ + NetworkPut: api.NetworkPut{Config: map[string]string{"network": "UPLINK"}, Description: "Default OVN network"}, + Name: "default", + Type: "ovn", + } + + return finalUplinkCfg, ovnNetwork +} + +// DefaultPendingZFSStoragePool returns the default local storage configuration when +// creating a pending pool on a specific cluster member target. +func (s LXDService) DefaultPendingZFSStoragePool(wipe bool, path string) api.StoragePoolsPost { + return api.StoragePoolsPost{ + Name: "local", + Driver: "zfs", + StoragePoolPut: api.StoragePoolPut{ + Config: map[string]string{"source": path, "source.wipe": strconv.FormatBool(wipe)}, + Description: "Local storage on ZFS", + }, + } +} + +// DefaultZFSStoragePool returns the default local storage configuration when +// creating the finalized pool. +func (s LXDService) DefaultZFSStoragePool() api.StoragePoolsPost { + return api.StoragePoolsPost{Name: "local", Driver: "zfs"} +} + +// DefaultZFSStoragePoolJoinConfig returns the default local storage configuration when +// joining an existing cluster. +func (s LXDService) DefaultZFSStoragePoolJoinConfig(wipe bool, path string) []api.ClusterMemberConfigKey { + wipeDisk := api.ClusterMemberConfigKey{ + Entity: "storage-pool", + Name: "local", + Key: "source.wipe", + Value: "true", + } + + sourceTemplate := api.ClusterMemberConfigKey{ + Entity: "storage-pool", + Name: "local", + Key: "source", + } + + sourceTemplate.Value = path + joinConfig := []api.ClusterMemberConfigKey{sourceTemplate} + if wipe { + joinConfig = append(joinConfig, wipeDisk) + } + + return joinConfig +} + +// DefaultPendingCephStoragePool returns the default remote storage configuration when +// creating a pending pool on a specific cluster member target. +func (s LXDService) DefaultPendingCephStoragePool() api.StoragePoolsPost { + return api.StoragePoolsPost{ + Name: "remote", + Driver: "ceph", + StoragePoolPut: api.StoragePoolPut{ + Config: map[string]string{ + "source": "lxd_remote", + }, + }, + } +} + +// DefaultCephStoragePool returns the default remote storage configuration when +// creating the finalized pool. +func (s LXDService) DefaultCephStoragePool() api.StoragePoolsPost { + return api.StoragePoolsPost{ + Name: "remote", + Driver: "ceph", + StoragePoolPut: api.StoragePoolPut{ + Config: map[string]string{ + "ceph.rbd.du": "false", + "ceph.rbd.features": "layering,striping,exclusive-lock,object-map,fast-diff,deep-flatten", + }, + Description: "Distributed storage on Ceph", + }, + } +} + +// DefaultCephStoragePoolJoinConfig returns the default remote storage configuration when +// joining an existing cluster. +func (s LXDService) DefaultCephStoragePoolJoinConfig() api.ClusterMemberConfigKey { + return api.ClusterMemberConfigKey{ + Entity: "storage-pool", + Name: "remote", + Key: "source", + Value: "lxd_remote", + } +} diff --git a/microcloud/service/microceph.go b/microcloud/service/microceph.go index 21817e3eb..51d43a566 100644 --- a/microcloud/service/microceph.go +++ b/microcloud/service/microceph.go @@ -2,6 +2,7 @@ package service import ( "context" + "fmt" "net/http" "net/url" "strings" @@ -75,8 +76,31 @@ func (s CephService) Client(target string, secret string) (*client.Client, error } // Bootstrap bootstraps the MicroCeph daemon on the default port. -func (s CephService) Bootstrap() error { - return s.m.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) +func (s CephService) Bootstrap(ctx context.Context) error { + err := s.m.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return fmt.Errorf("Timed out waiting for MicroCeph cluster to initialize") + default: + names, err := s.ClusterMembers(ctx) + if err != nil { + return err + } + + if len(names) > 0 { + return nil + } + + time.Sleep(time.Second) + } + } } // IssueToken issues a token for the given peer. @@ -85,7 +109,7 @@ func (s CephService) IssueToken(peer string) (string, error) { } // Join joins a cluster with the given token. -func (s CephService) Join(joinConfig JoinConfig) error { +func (s CephService) Join(ctx context.Context, joinConfig JoinConfig) error { err := s.m.JoinCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), joinConfig.Token, 5*time.Minute) if err != nil { return err @@ -97,7 +121,7 @@ func (s CephService) Join(joinConfig JoinConfig) error { } for _, disk := range joinConfig.CephConfig { - err := cephClient.AddDisk(context.Background(), c, &disk) + err := cephClient.AddDisk(ctx, c, &disk) if err != nil { return err } @@ -107,13 +131,13 @@ func (s CephService) Join(joinConfig JoinConfig) error { } // ClusterMembers returns a map of cluster member names and addresses. -func (s CephService) ClusterMembers() (map[string]string, error) { +func (s CephService) ClusterMembers(ctx context.Context) (map[string]string, error) { client, err := s.Client("", "") if err != nil { return nil, err } - members, err := client.GetClusterMembers(context.Background()) + members, err := client.GetClusterMembers(ctx) if err != nil { return nil, err } diff --git a/microcloud/service/microcloud.go b/microcloud/service/microcloud.go index 63f6e02a9..6a0715e88 100644 --- a/microcloud/service/microcloud.go +++ b/microcloud/service/microcloud.go @@ -69,8 +69,31 @@ func (s *CloudService) StartCloud(service *Handler, endpoints []rest.Endpoint) e } // Bootstrap bootstraps the MicroCloud daemon on the default port. -func (s CloudService) Bootstrap() error { - return s.client.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) +func (s CloudService) Bootstrap(ctx context.Context) error { + err := s.client.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return fmt.Errorf("Timed out waiting for MicroCloud cluster to initialize") + default: + names, err := s.ClusterMembers(ctx) + if err != nil { + return err + } + + if len(names) > 0 { + return nil + } + + time.Sleep(time.Second) + } + } } // IssueToken issues a token for the given peer. @@ -79,7 +102,7 @@ func (s CloudService) IssueToken(peer string) (string, error) { } // Join joins a cluster with the given token. -func (s CloudService) Join(joinConfig JoinConfig) error { +func (s CloudService) Join(ctx context.Context, joinConfig JoinConfig) error { return s.client.JoinCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), joinConfig.Token, 5*time.Minute) } @@ -117,13 +140,13 @@ func (s CloudService) RequestJoin(ctx context.Context, secrets map[string]string } // ClusterMembers returns a map of cluster member names and addresses. -func (s CloudService) ClusterMembers() (map[string]string, error) { +func (s CloudService) ClusterMembers(ctx context.Context) (map[string]string, error) { client, err := s.client.LocalClient() if err != nil { return nil, err } - members, err := client.GetClusterMembers(context.Background()) + members, err := client.GetClusterMembers(ctx) if err != nil { return nil, err } diff --git a/microcloud/service/microovn.go b/microcloud/service/microovn.go index 658fbe664..80d08f65b 100644 --- a/microcloud/service/microovn.go +++ b/microcloud/service/microovn.go @@ -2,6 +2,7 @@ package service import ( "context" + "fmt" "net/http" "net/url" "strings" @@ -53,8 +54,31 @@ func (s OVNService) Client() (*client.Client, error) { } // Bootstrap bootstraps the MicroOVN daemon on the default port. -func (s OVNService) Bootstrap() error { - return s.m.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) +func (s OVNService) Bootstrap(ctx context.Context) error { + err := s.m.NewCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), 2*time.Minute) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return fmt.Errorf("Timed out waiting for MicroOVN cluster to initialize") + default: + names, err := s.ClusterMembers(ctx) + if err != nil { + return err + } + + if len(names) > 0 { + return nil + } + + time.Sleep(time.Second) + } + } } // IssueToken issues a token for the given peer. @@ -63,18 +87,18 @@ func (s OVNService) IssueToken(peer string) (string, error) { } // Join joins a cluster with the given token. -func (s OVNService) Join(joinConfig JoinConfig) error { +func (s OVNService) Join(ctx context.Context, joinConfig JoinConfig) error { return s.m.JoinCluster(s.name, util.CanonicalNetworkAddress(s.address, s.port), joinConfig.Token, 5*time.Minute) } // ClusterMembers returns a map of cluster member names and addresses. -func (s OVNService) ClusterMembers() (map[string]string, error) { +func (s OVNService) ClusterMembers(ctx context.Context) (map[string]string, error) { client, err := s.Client() if err != nil { return nil, err } - members, err := client.GetClusterMembers(context.Background()) + members, err := client.GetClusterMembers(ctx) if err != nil { return nil, err } diff --git a/microcloud/service/service_handler.go b/microcloud/service/service_handler.go index d04ad85d6..1ef31c542 100644 --- a/microcloud/service/service_handler.go +++ b/microcloud/service/service_handler.go @@ -84,7 +84,7 @@ func (s *Handler) Start(state *state.State) error { return nil } - err := s.Services[types.LXD].(*LXDService).Restart(30) + err := s.Services[types.LXD].(*LXDService).Restart(state.Context, 30) if err != nil { logger.Error("Failed to restart LXD", logger.Ctx{"error": err}) }