Files
portainer/pkg/libstack/swarm/swarm.go

906 lines
26 KiB
Go

package swarm
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/portainer/portainer/api/filesystem"
"github.com/portainer/portainer/pkg/libstack"
"github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/compose/convert"
composeloader "github.com/docker/cli/cli/compose/loader"
"github.com/docker/cli/cli/compose/schema"
composetypes "github.com/docker/cli/cli/compose/types"
configtypes "github.com/docker/cli/cli/config/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
dockerregistry "github.com/docker/docker/registry"
"github.com/rs/zerolog/log"
)
// Options holds connection and credential settings for swarm operations.
type Options struct {
ProjectName string
Host string
Env []string
WorkingDir string
Registries []configtypes.AuthConfig
}
// DeployOptions extends Options with deployment-specific settings.
type DeployOptions struct {
Options
RemoveOrphans bool
// PullImage controls how image digests are resolved on deploy:
// true - query the registry (ResolveImageAlways)
// false - never contact the registry; reuse the existing digest (ResolveImageNever)
PullImage bool
// ForceRecreate increments ForceUpdate on every existing service so that
// Docker re-schedules all tasks even when nothing in the spec has changed.
ForceRecreate bool
}
// RemoveOptions extends Options with removal settings.
type RemoveOptions struct {
Options
}
// Deployer is the interface for in-process Docker Swarm stack management.
type Deployer interface {
Deploy(ctx context.Context, filePaths []string, options DeployOptions) error
Remove(ctx context.Context, projectName string, options RemoveOptions) error
Validate(ctx context.Context, filePaths []string, options Options) error
WaitForStatus(ctx context.Context, projectName string, options Options, status libstack.Status) libstack.WaitResult
}
// SwarmDeployer implements Deployer using the Docker API in-process.
type SwarmDeployer struct{}
// NewSwarmDeployer creates a new SwarmDeployer.
func NewSwarmDeployer() *SwarmDeployer { return &SwarmDeployer{} }
// Deploy creates or updates a Docker Swarm stack from the given compose files.
func (d *SwarmDeployer) Deploy(ctx context.Context, filePaths []string, options DeployOptions) error {
return libstack.WithCli(
ctx,
libstack.DockerCliOptions{Host: options.Host, Registries: options.Registries},
func(ctx context.Context, dockerCLI *command.DockerCli) error {
return deployStack(ctx, dockerCLI, filePaths, options)
})
}
// Validate loads and parses the compose file(s), returning an error if they are invalid.
func (d *SwarmDeployer) Validate(_ context.Context, filePaths []string, options Options) error {
_, err := getConfig(filePaths, options.WorkingDir, options.Env)
return err
}
// Remove deletes all resources belonging to a Swarm stack and waits for tasks to terminate.
func (d *SwarmDeployer) Remove(ctx context.Context, projectName string, options RemoveOptions) error {
return libstack.WithCli(
ctx,
libstack.DockerCliOptions{Host: options.Host, Registries: options.Registries},
func(ctx context.Context, dockerCLI *command.DockerCli) error {
apiClient := dockerCLI.Client()
services, err := getStackServices(ctx, apiClient, projectName)
if err != nil {
return err
}
secrets, err := getStackSecrets(ctx, apiClient, projectName)
if err != nil {
return err
}
configs, err := getStackConfigs(ctx, apiClient, projectName)
if err != nil {
return err
}
networks, err := getStackNetworks(ctx, apiClient, projectName)
if err != nil {
return err
}
if len(services)+len(secrets)+len(configs)+len(networks) == 0 {
log.Info().Str("stack", projectName).Msg("nothing found in stack")
return nil
}
var errs []error
if err := removeServices(ctx, apiClient, services); err != nil {
errs = append(errs, err)
}
if err := removeSecrets(ctx, apiClient, secrets); err != nil {
errs = append(errs, err)
}
if err := removeConfigs(ctx, apiClient, configs); err != nil {
errs = append(errs, err)
}
if err := removeNetworks(ctx, apiClient, networks); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
return errors.Join(errs...)
}
// Wait for all tasks to reach a terminal state before returning, mirroring
// the behaviour of `docker stack rm --detach=false`.
return waitOnTasks(ctx, apiClient, projectName)
})
}
// deployStack is the core stack deployment logic.
// It reimplements `docker stack deploy` in-process using the docker/cli compose loader and
// convert packages. Reference: https://github.com/docker/cli/blob/v28.5.2/cli/command/stack/swarm/deploy_composefile.go
func deployStack(ctx context.Context, dockerCLI *command.DockerCli, filePaths []string, options DeployOptions) error {
info, err := dockerCLI.Client().Info(ctx)
if err != nil {
return fmt.Errorf("failed to get docker info: %w", err)
}
if !info.Swarm.ControlAvailable {
return errors.New(`this node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again`)
}
config, err := getConfig(filePaths, options.WorkingDir, options.Env)
if err != nil {
return fmt.Errorf("failed to load compose file: %w", err)
}
namespace := convert.NewNamespace(options.ProjectName)
// Prune orphan services before deploying to avoid name conflicts during rolling updates.
if options.RemoveOrphans {
incoming := make(map[string]struct{}, len(config.Services))
for _, svc := range config.Services {
incoming[svc.Name] = struct{}{}
}
err := pruneServices(ctx, dockerCLI.Client(), namespace, incoming)
if err != nil {
return err
}
}
serviceNetworks := getServicesDeclaredNetworks(config.Services)
networks, externalNetworks := convert.Networks(namespace, config.Networks, serviceNetworks)
if err := validateExternalNetworks(ctx, dockerCLI.Client(), externalNetworks); err != nil {
return err
}
if err := createNetworks(ctx, dockerCLI.Client(), namespace, networks); err != nil {
return err
}
secrets, err := convert.Secrets(namespace, config.Secrets)
if err != nil {
return err
}
if err := createSecrets(ctx, dockerCLI.Client(), secrets); err != nil {
return err
}
configs, err := convert.Configs(namespace, config.Configs)
if err != nil {
return err
}
if err := createConfigs(ctx, dockerCLI.Client(), configs); err != nil {
return err
}
services, err := convert.Services(ctx, namespace, config, dockerCLI.Client())
if err != nil {
return err
}
return deployServices(
ctx,
dockerCLI.Client(),
options.Registries,
services,
namespace,
options.PullImage,
options.ForceRecreate,
)
}
func getStackFilter(namespace string) filters.Args {
f := filters.NewArgs()
f.Add("label", convert.LabelNamespace+"="+namespace)
return f
}
func getStackServices(ctx context.Context, apiClient client.APIClient, namespace string) ([]swarm.Service, error) {
return apiClient.ServiceList(ctx, swarm.ServiceListOptions{Filters: getStackFilter(namespace)})
}
func getStackNetworks(ctx context.Context, apiClient client.APIClient, namespace string) ([]network.Summary, error) {
return apiClient.NetworkList(ctx, network.ListOptions{Filters: getStackFilter(namespace)})
}
func getStackSecrets(ctx context.Context, apiClient client.APIClient, namespace string) ([]swarm.Secret, error) {
return apiClient.SecretList(ctx, swarm.SecretListOptions{Filters: getStackFilter(namespace)})
}
func getStackConfigs(ctx context.Context, apiClient client.APIClient, namespace string) ([]swarm.Config, error) {
return apiClient.ConfigList(ctx, swarm.ConfigListOptions{Filters: getStackFilter(namespace)})
}
func getStackTasks(ctx context.Context, apiClient client.APIClient, namespace string) ([]swarm.Task, error) {
return apiClient.TaskList(ctx, swarm.TaskListOptions{Filters: getStackFilter(namespace)})
}
func getServicesDeclaredNetworks(services []composetypes.ServiceConfig) map[string]struct{} {
serviceNetworks := make(map[string]struct{})
for _, svc := range services {
if len(svc.Networks) == 0 {
serviceNetworks["default"] = struct{}{}
continue
}
for nw := range svc.Networks {
serviceNetworks[nw] = struct{}{}
}
}
return serviceNetworks
}
func validateExternalNetworks(ctx context.Context, apiClient client.NetworkAPIClient, externalNetworks []string) error {
for _, name := range externalNetworks {
if !container.NetworkMode(name).IsUserDefined() {
// Networks that are not user defined always exist on all nodes as
// local-scoped networks, so there's no need to inspect them.
continue
}
nw, err := apiClient.NetworkInspect(ctx, name, network.InspectOptions{})
switch {
case errdefs.IsNotFound(err):
return fmt.Errorf("network %q is declared as external, but could not be found. You need to create a swarm-scoped network before the stack is deployed", name)
case err != nil:
return err
case nw.Scope != "swarm":
return fmt.Errorf("network %q is declared as external, but it is not in the right scope: %q instead of \"swarm\"", name, nw.Scope)
}
}
return nil
}
func createNetworks(
ctx context.Context,
apiClient client.APIClient,
namespace convert.Namespace,
networks map[string]network.CreateOptions,
) error {
existingNetworks, err := getStackNetworks(ctx, apiClient, namespace.Name())
if err != nil {
return err
}
existingNetworkMap := make(map[string]network.Summary, len(existingNetworks))
for _, nw := range existingNetworks {
existingNetworkMap[nw.Name] = nw
}
for name, createOpts := range networks {
if _, exists := existingNetworkMap[name]; exists {
continue
}
if createOpts.Driver == "" {
createOpts.Driver = "overlay"
}
log.Info().Str("network", name).Msg("creating network")
if _, err := apiClient.NetworkCreate(ctx, name, createOpts); err != nil {
return fmt.Errorf("failed to create network %s: %w", name, err)
}
}
return nil
}
func createSecrets(ctx context.Context, apiClient client.APIClient, secrets []swarm.SecretSpec) error {
for _, secretSpec := range secrets {
existing, _, err := apiClient.SecretInspectWithRaw(ctx, secretSpec.Name)
switch {
case err == nil:
if err := apiClient.SecretUpdate(ctx, existing.ID, existing.Version, secretSpec); err != nil {
return fmt.Errorf("failed to update secret %s: %w", secretSpec.Name, err)
}
case errdefs.IsNotFound(err):
log.Info().Str("secret", secretSpec.Name).Msg("creating secret")
if _, err := apiClient.SecretCreate(ctx, secretSpec); err != nil {
return fmt.Errorf("failed to create secret %s: %w", secretSpec.Name, err)
}
default:
return err
}
}
return nil
}
func createConfigs(ctx context.Context, apiClient client.APIClient, configs []swarm.ConfigSpec) error {
for _, configSpec := range configs {
existing, _, err := apiClient.ConfigInspectWithRaw(ctx, configSpec.Name)
switch {
case err == nil:
if err := apiClient.ConfigUpdate(ctx, existing.ID, existing.Version, configSpec); err != nil {
return fmt.Errorf("failed to update config %s: %w", configSpec.Name, err)
}
case errdefs.IsNotFound(err):
log.Info().Str("config", configSpec.Name).Msg("creating config")
if _, err := apiClient.ConfigCreate(ctx, configSpec); err != nil {
return fmt.Errorf("failed to create config %s: %w", configSpec.Name, err)
}
default:
return err
}
}
return nil
}
// encodeRegistryAuth finds the registry credentials for the given image and returns
// the base64-encoded auth string expected by the Docker service API.
// Returns an empty string (no error) when no matching credentials are found.
func encodeRegistryAuth(image string, registries []configtypes.AuthConfig) (string, error) {
named, err := reference.ParseNormalizedNamed(image)
if err != nil {
return "", fmt.Errorf("failed to parse image reference %q: %w", image, err)
}
domain := reference.Domain(named)
if domain == "docker.io" {
domain = dockerregistry.IndexServer
}
for _, r := range registries {
if r.ServerAddress == domain {
encoded, err := registrytypes.EncodeAuthConfig(registrytypes.AuthConfig{
Username: r.Username,
Password: r.Password,
ServerAddress: r.ServerAddress,
Auth: r.Auth,
IdentityToken: r.IdentityToken,
RegistryToken: r.RegistryToken,
})
if err != nil {
return "", fmt.Errorf("failed to encode auth for registry %s: %w", domain, err)
}
return encoded, nil
}
}
return "", nil
}
func deployServices(
ctx context.Context,
apiClient client.APIClient,
registries []configtypes.AuthConfig,
services map[string]swarm.ServiceSpec,
namespace convert.Namespace,
pullImage bool,
forceRecreate bool,
) error {
existingServices, err := getStackServices(ctx, apiClient, namespace.Name())
if err != nil {
return err
}
existingServiceMap := make(map[string]swarm.Service, len(existingServices))
for _, svc := range existingServices {
existingServiceMap[svc.Spec.Name] = svc
}
for internalName, serviceSpec := range services {
name := namespace.Scope(internalName)
image := serviceSpec.TaskTemplate.ContainerSpec.Image
encodedAuth, err := encodeRegistryAuth(image, registries)
if err != nil {
return fmt.Errorf("failed to encode registry auth for image %s: %w", image, err)
}
if existing, exists := existingServiceMap[name]; exists {
log.Info().Str("service", name).Str("id", existing.ID).Msg("updating service")
updateOpts := swarm.ServiceUpdateOptions{EncodedRegistryAuth: encodedAuth}
if pullImage {
// pullImage=true → ResolveImageAlways: always query the registry during
// updates so redeploys can repull images even when the tag is unchanged.
updateOpts.QueryRegistry = true
} else {
// pullImage=false → ResolveImageNever: always reuse the existing digest.
if image == existing.Spec.Labels[convert.LabelImage] {
serviceSpec.TaskTemplate.ContainerSpec.Image = existing.Spec.TaskTemplate.ContainerSpec.Image
}
}
if forceRecreate {
serviceSpec.TaskTemplate.ForceUpdate = existing.Spec.TaskTemplate.ForceUpdate + 1
} else {
// Preserve ForceUpdate so that tasks are not re-deployed if nothing changed.
serviceSpec.TaskTemplate.ForceUpdate = existing.Spec.TaskTemplate.ForceUpdate
}
response, err := apiClient.ServiceUpdate(ctx, existing.ID, existing.Version, serviceSpec, updateOpts)
if err != nil {
return fmt.Errorf("failed to update service %s: %w", name, err)
}
for _, warning := range response.Warnings {
log.Warn().Str("service", name).Msg(warning)
}
} else {
log.Info().Str("service", name).Msg("creating service")
createOpts := swarm.ServiceCreateOptions{EncodedRegistryAuth: encodedAuth}
if pullImage {
createOpts.QueryRegistry = true
}
if _, err := apiClient.ServiceCreate(ctx, serviceSpec, createOpts); err != nil {
return fmt.Errorf("failed to create service %s: %w", name, err)
}
}
}
return nil
}
// pruneServices removes services that are present in the existing stack but absent from
// the incoming config. Must be called before deploying the new config to avoid name conflicts.
func pruneServices(
ctx context.Context,
apiClient client.APIClient,
namespace convert.Namespace,
incoming map[string]struct{},
) error {
existingServices, err := getStackServices(ctx, apiClient, namespace.Name())
if err != nil {
return fmt.Errorf("failed to list services for pruning: %w", err)
}
toRemove := make([]swarm.Service, 0, len(existingServices))
for _, svc := range existingServices {
if _, exists := incoming[namespace.Descope(svc.Spec.Name)]; !exists {
toRemove = append(toRemove, svc)
}
}
err = removeServices(ctx, apiClient, toRemove)
if err != nil {
return fmt.Errorf("failed to prune orphan services: %w", err)
}
return nil
}
func removeServices(ctx context.Context, apiClient client.APIClient, services []swarm.Service) error {
sort.Slice(services, func(i, j int) bool {
return services[i].Spec.Name < services[j].Spec.Name
})
var errs []error
for _, svc := range services {
log.Info().Str("service", svc.Spec.Name).Msg("removing service")
if err := apiClient.ServiceRemove(ctx, svc.ID); err != nil {
errs = append(errs, fmt.Errorf("failed to remove service %s: %w", svc.Spec.Name, err))
}
}
return errors.Join(errs...)
}
func removeNetworks(ctx context.Context, apiClient client.APIClient, networks []network.Summary) error {
var errs []error
for _, nw := range networks {
log.Info().Str("network", nw.Name).Msg("removing network")
if err := apiClient.NetworkRemove(ctx, nw.ID); err != nil {
errs = append(errs, fmt.Errorf("failed to remove network %s: %w", nw.Name, err))
}
}
return errors.Join(errs...)
}
func removeSecrets(ctx context.Context, apiClient client.APIClient, secrets []swarm.Secret) error {
var errs []error
for _, secret := range secrets {
log.Info().Str("secret", secret.Spec.Name).Msg("removing secret")
if err := apiClient.SecretRemove(ctx, secret.ID); err != nil {
errs = append(errs, fmt.Errorf("failed to remove secret %s: %w", secret.Spec.Name, err))
}
}
return errors.Join(errs...)
}
func removeConfigs(ctx context.Context, apiClient client.APIClient, configs []swarm.Config) error {
var errs []error
for _, cfg := range configs {
log.Info().Str("config", cfg.Spec.Name).Msg("removing config")
if err := apiClient.ConfigRemove(ctx, cfg.ID); err != nil {
errs = append(errs, fmt.Errorf("failed to remove config %s: %w", cfg.Spec.Name, err))
}
}
return errors.Join(errs...)
}
// taskStateOrdinal mirrors docker/cli's unexported numberedStates map (cli/command/stack/swarm/remove.go).
// The Docker SDK does not export terminal-state checking utilities, so we duplicate it here.
var taskStateOrdinal = map[swarm.TaskState]int{
swarm.TaskStateNew: 1,
swarm.TaskStateAllocated: 2,
swarm.TaskStatePending: 3,
swarm.TaskStateAssigned: 4,
swarm.TaskStateAccepted: 5,
swarm.TaskStatePreparing: 6,
swarm.TaskStateReady: 7,
swarm.TaskStateStarting: 8,
swarm.TaskStateRunning: 9,
swarm.TaskStateComplete: 10,
swarm.TaskStateShutdown: 11,
swarm.TaskStateFailed: 12,
swarm.TaskStateRejected: 13,
}
func isTerminalState(state swarm.TaskState) bool {
return taskStateOrdinal[state] > taskStateOrdinal[swarm.TaskStateRunning]
}
func getConfig(filePaths []string, workingDir string, env []string) (*composetypes.Config, error) {
// Load and parse the compose file(s).
configDetails, err := getConfigDetails(filePaths, workingDir, env)
if err != nil {
return nil, fmt.Errorf("failed to load compose file: %w", err)
}
// Collect raw config dicts for unsupported/deprecated property checks.
dicts := make([]map[string]any, 0, len(configDetails.ConfigFiles))
for _, cf := range configDetails.ConfigFiles {
dicts = append(dicts, cf.Config)
}
config, err := composeloader.Load(configDetails)
if err != nil {
if fpe, ok := errors.AsType[*composeloader.ForbiddenPropertiesError](err); ok {
return nil, fmt.Errorf("compose file contains unsupported options: %v", fpe.Properties)
}
return nil, fmt.Errorf("failed to parse compose file: %w", err)
}
if unsupported := composeloader.GetUnsupportedProperties(dicts...); len(unsupported) > 0 {
log.Warn().Strs("properties", unsupported).Msg("ignoring unsupported compose properties")
}
if deprecated := composeloader.GetDeprecatedProperties(dicts...); len(deprecated) > 0 {
log.Warn().Interface("properties", deprecated).Msg("ignoring deprecated compose properties")
}
for _, svc := range config.Services {
if svc.Image == "" {
return nil, fmt.Errorf("invalid image reference for service %s: no image specified", svc.Name)
}
if _, err := reference.ParseAnyReference(svc.Image); err != nil {
return nil, fmt.Errorf("invalid image reference for service %s: %w", svc.Name, err)
}
}
return config, nil
}
func getConfigDetails(filePaths []string, workingDir string, env []string) (composetypes.ConfigDetails, error) {
var details composetypes.ConfigDetails
if len(filePaths) == 0 {
return details, errors.New("at least one compose file must be specified")
}
details.WorkingDir = workingDir
details.ConfigFiles = make([]composetypes.ConfigFile, 0, len(filePaths))
for _, fp := range filePaths {
bytes, err := os.ReadFile(fp)
if err != nil {
return details, err
}
config, err := composeloader.ParseYAML(bytes)
if err != nil {
return details, err
}
resolveEnvFilePaths(config, workingDir)
details.ConfigFiles = append(details.ConfigFiles, composetypes.ConfigFile{
Filename: fp,
Config: config,
})
}
// Take the first file version (2 files can't have different version)
details.Version = schema.Version(details.ConfigFiles[0].Config)
details.Environment = make(map[string]string)
addEnvVarFn := func(e string) {
k, v, _ := strings.Cut(e, "=")
if k != "" {
details.Environment[k] = v
}
}
for _, e := range libstack.PortainerEnvVars() {
addEnvVarFn(e)
}
for _, e := range env {
addEnvVarFn(e)
}
return details, nil
}
func resolveEnvFilePaths(rawConfig map[string]any, workingDir string) {
services, ok := rawConfig["services"].(map[string]any)
if !ok {
return
}
for _, svcAny := range services {
svc, ok := svcAny.(map[string]any)
if !ok {
continue
}
envFileAny, ok := svc["env_file"]
if !ok {
continue
}
switch ef := envFileAny.(type) {
case string:
if !filepath.IsAbs(ef) {
svc["env_file"] = filesystem.JoinPaths(workingDir, ef)
}
case []any:
for i, v := range ef {
if s, ok := v.(string); ok && !filepath.IsAbs(s) {
ef[i] = filesystem.JoinPaths(workingDir, s)
}
}
}
}
}
// WaitForStatus blocks until all services in the stack reach the requested status,
// or the context is cancelled. It polls the Docker API every second.
func (d *SwarmDeployer) WaitForStatus(
ctx context.Context,
projectName string,
options Options,
status libstack.Status,
) libstack.WaitResult {
waitResult := libstack.WaitResult{Status: status}
// WithCli replaces the context with Background internally, so we capture the
// caller's context here to preserve cancellation.
callerCtx := ctx
err := libstack.WithCli(
ctx,
libstack.DockerCliOptions{Host: options.Host, Registries: options.Registries},
func(_ context.Context, dockerCLI *command.DockerCli) error {
apiClient := dockerCLI.Client()
for {
if callerCtx.Err() != nil {
waitResult.ErrorMsg = "failed to wait for status: " + callerCtx.Err().Error()
return nil
}
time.Sleep(time.Second)
services, err := getStackServices(callerCtx, apiClient, projectName)
if err != nil {
log.Warn().Str("project_name", projectName).Err(err).Msg("failed to list stack services")
continue
}
if len(services) == 0 && status == libstack.StatusRemoved {
return nil
}
var serviceStatuses []libstack.Status
for _, svc := range services {
svcStatus, errorMessage, err := getServiceStatus(callerCtx, apiClient, svc)
if err != nil {
log.Warn().
Str("project_name", projectName).
Str("service_name", svc.Spec.Name).
Err(err).
Msg("failed to get service status")
continue
}
if errorMessage != "" {
waitResult.ErrorMsg = errorMessage
return nil
}
serviceStatuses = append(serviceStatuses, svcStatus)
}
if aggregateStatus(serviceStatuses) == status {
return nil
}
log.Debug().
Str("project_name", projectName).
Str("required_status", string(status)).
Str("status", string(aggregateStatus(serviceStatuses))).
Msg("waiting for status")
}
})
if err != nil && waitResult.ErrorMsg == "" {
waitResult.Status = libstack.StatusError
waitResult.ErrorMsg = err.Error()
}
return waitResult
}
func aggregateStatus(statuses []libstack.Status) libstack.Status {
if len(statuses) == 0 {
return libstack.StatusRemoved
}
statusCounts := make(map[libstack.Status]int)
for _, status := range statuses {
statusCounts[status]++
}
log.Debug().Interface("statusCounts", statusCounts).Msg("check_status")
return libstack.AggregateStatusCounts(statusCounts, len(statuses))
}
func getServiceStatus(
ctx context.Context,
apiClient client.APIClient,
svc swarm.Service,
) (libstack.Status, string, error) {
tasks, err := apiClient.TaskList(ctx, swarm.TaskListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{Key: "service", Value: svc.ID}),
})
if err != nil {
return "", "", fmt.Errorf("failed to list tasks for service %s: %w", svc.Spec.Name, err)
}
expectedRunningTaskCount := 0
if svc.Spec.Mode.Replicated != nil {
expectedRunningTaskCount = int(*svc.Spec.Mode.Replicated.Replicas)
}
if svc.Spec.Mode.Global != nil {
nodes, err := apiClient.NodeList(ctx, swarm.NodeListOptions{})
if err != nil {
return "", "", fmt.Errorf("failed to list nodes: %w", err)
}
expectedRunningTaskCount = len(nodes)
}
if expectedRunningTaskCount != 0 {
runningTaskCount := 0
for _, task := range tasks {
if task.Status.State == swarm.TaskStateRunning {
runningTaskCount++
}
}
if runningTaskCount == expectedRunningTaskCount {
return libstack.StatusRunning, "", nil
}
}
for _, task := range tasks {
switch task.Status.State {
case swarm.TaskStateRunning:
return libstack.StatusRunning, "", nil
case swarm.TaskStatePending, swarm.TaskStateStarting:
return libstack.StatusStarting, "", nil
case swarm.TaskStateRemove:
return libstack.StatusRemoving, "", nil
case swarm.TaskStateFailed:
return libstack.StatusError, task.Status.Err, nil
default:
return libstack.StatusUnknown, "", nil
}
}
return libstack.StatusUnknown, "", nil
}
// waitOnTasks polls until all tasks belonging to the namespace reach a terminal state.
func waitOnTasks(ctx context.Context, apiClient client.APIClient, namespace string) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
tasks, err := getStackTasks(ctx, apiClient, namespace)
if err != nil {
return fmt.Errorf("failed to get tasks: %w", err)
}
if len(tasks) == 0 {
return nil
}
allTerminal := true
for _, task := range tasks {
if !isTerminalState(task.Status.State) {
allTerminal = false
break
}
}
if allTerminal {
return nil
}
time.Sleep(time.Second)
}
}