335 lines
10 KiB
Go
335 lines
10 KiB
Go
package postinit
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/client"
|
|
portainer "github.com/portainer/portainer/api"
|
|
"github.com/portainer/portainer/api/dataservices"
|
|
dockerClient "github.com/portainer/portainer/api/docker/client"
|
|
"github.com/portainer/portainer/api/internal/endpointutils"
|
|
"github.com/portainer/portainer/api/internal/registryutils"
|
|
"github.com/portainer/portainer/api/kubernetes/cli"
|
|
"github.com/portainer/portainer/api/logs"
|
|
"github.com/portainer/portainer/api/pendingactions/actions"
|
|
"github.com/portainer/portainer/pkg/endpoints"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type PostInitMigrator struct {
|
|
kubeFactory *cli.ClientFactory
|
|
dockerFactory *dockerClient.ClientFactory
|
|
dataStore dataservices.DataStore
|
|
assetsPath string
|
|
kubernetesDeployer portainer.KubernetesDeployer
|
|
}
|
|
|
|
func NewPostInitMigrator(
|
|
kubeFactory *cli.ClientFactory,
|
|
dockerFactory *dockerClient.ClientFactory,
|
|
dataStore dataservices.DataStore,
|
|
assetsPath string,
|
|
kubernetesDeployer portainer.KubernetesDeployer,
|
|
) *PostInitMigrator {
|
|
return &PostInitMigrator{
|
|
kubeFactory: kubeFactory,
|
|
dockerFactory: dockerFactory,
|
|
dataStore: dataStore,
|
|
assetsPath: assetsPath,
|
|
kubernetesDeployer: kubernetesDeployer,
|
|
}
|
|
}
|
|
|
|
// PostInitMigrate will run all post-init migrations, which require docker/kube clients for all edge or non-edge environments
|
|
func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
|
|
var environments []portainer.Endpoint
|
|
|
|
if err := postInitMigrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
|
var err error
|
|
if environments, err = tx.Endpoint().ReadAll(func(endpoint portainer.Endpoint) bool {
|
|
return endpoints.HasDirectConnectivity(&endpoint)
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to retrieve environments: %w", err)
|
|
}
|
|
|
|
var pendingActions []portainer.PendingAction
|
|
if pendingActions, err = tx.PendingActions().ReadAll(func(action portainer.PendingAction) bool {
|
|
return action.Action == actions.PostInitMigrateEnvironment
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to retrieve pending actions: %w", err)
|
|
}
|
|
|
|
// Sort for the binary search in createPostInitMigrationPendingAction()
|
|
slices.SortFunc(pendingActions, func(a, b portainer.PendingAction) int {
|
|
return cmp.Compare(a.EndpointID, b.EndpointID)
|
|
})
|
|
|
|
for _, environment := range environments {
|
|
if !endpoints.IsEdgeEndpoint(&environment) {
|
|
continue
|
|
}
|
|
|
|
// Edge environments will run after the server starts, in pending actions
|
|
log.Info().
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("adding pending action 'PostInitMigrateEnvironment' for environment")
|
|
|
|
if err := postInitMigrator.createPostInitMigrationPendingAction(tx, environment.ID, pendingActions); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error creating pending action for environment")
|
|
}
|
|
}
|
|
|
|
return err
|
|
}); err != nil {
|
|
log.Error().Err(err).Msg("error running post-init migrations")
|
|
|
|
return err
|
|
}
|
|
|
|
for _, environment := range environments {
|
|
if endpoints.IsEdgeEndpoint(&environment) {
|
|
continue
|
|
}
|
|
|
|
// Non-edge environments will run before the server starts.
|
|
if err := postInitMigrator.MigrateEnvironment(&environment); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error running post-init migrations for non-edge environment")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// try to create a post init migration pending action. If it already exists, do nothing
|
|
// this function exists for readability, not reusability
|
|
// pending actions must be passed in ascending order by endpoint ID
|
|
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(tx dataservices.DataStoreTx, environmentID portainer.EndpointID, pendingActions []portainer.PendingAction) error {
|
|
action := portainer.PendingAction{
|
|
EndpointID: environmentID,
|
|
Action: actions.PostInitMigrateEnvironment,
|
|
}
|
|
|
|
if _, found := slices.BinarySearchFunc(pendingActions, environmentID, func(e portainer.PendingAction, id portainer.EndpointID) int {
|
|
return cmp.Compare(e.EndpointID, id)
|
|
}); found {
|
|
log.Debug().
|
|
Str("action", action.Action).
|
|
Int("endpoint_id", int(action.EndpointID)).
|
|
Msg("pending action already exists for environment, skipping...")
|
|
|
|
return nil
|
|
}
|
|
|
|
return tx.PendingActions().Create(&action)
|
|
}
|
|
|
|
// MigrateEnvironment runs migrations on a single environment
|
|
func (migrator *PostInitMigrator) MigrateEnvironment(environment *portainer.Endpoint) error {
|
|
log.Info().
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("executing post init migration for environment")
|
|
|
|
switch {
|
|
case endpointutils.IsKubernetesEndpoint(environment):
|
|
// get the kubeclient for the environment, and skip all kube migrations if there's an error
|
|
kubeclient, err := migrator.kubeFactory.GetPrivilegedKubeClient(environment)
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error creating kubeclient for environment")
|
|
|
|
return err
|
|
}
|
|
|
|
// If one environment fails, it is logged and the next migration runs. The error is returned at the end and handled by pending actions
|
|
var latestErr error
|
|
kubernetesMigrations := []func() error{
|
|
func() error { return migrator.MigrateIngresses(*environment, kubeclient) },
|
|
func() error { return migrator.MigrateRegistrySASecrets(*environment, kubeclient) },
|
|
}
|
|
|
|
for _, migration := range kubernetesMigrations {
|
|
if err := migration(); err != nil {
|
|
latestErr = err
|
|
}
|
|
}
|
|
|
|
return latestErr
|
|
case endpointutils.IsDockerEndpoint(environment):
|
|
// get the docker client for the environment, and skip all docker migrations if there's an error
|
|
dockerClient, err := migrator.dockerFactory.CreateClient(environment, "", nil)
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error creating docker client for environment")
|
|
|
|
return err
|
|
}
|
|
defer logs.CloseAndLogErr(dockerClient)
|
|
|
|
if err := migrator.MigrateGPUs(*environment, dockerClient); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error migrating GPUs for environment")
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migrator *PostInitMigrator) MigrateRegistrySASecrets(environment portainer.Endpoint, kubeclient *cli.KubeClient) error {
|
|
if !environment.PostInitMigrations.MigrateRegistrySASecrets {
|
|
return nil
|
|
}
|
|
|
|
log.Debug().
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("migrating registry SA secrets for environment")
|
|
|
|
return migrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
|
env, err := tx.Endpoint().Endpoint(environment.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !env.PostInitMigrations.MigrateRegistrySASecrets {
|
|
return nil
|
|
}
|
|
|
|
registries, err := tx.Registry().ReadAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, registry := range registries {
|
|
access, ok := registry.RegistryAccesses[env.ID]
|
|
if !ok || len(access.Namespaces) == 0 {
|
|
continue
|
|
}
|
|
|
|
secretName := registryutils.RegistrySecretName(registry.ID)
|
|
for _, namespace := range access.Namespaces {
|
|
if err := kubeclient.AddImagePullSecretToServiceAccount(namespace, "default", secretName); err != nil {
|
|
log.Warn().
|
|
Err(err).
|
|
Int("endpoint_id", int(env.ID)).
|
|
Str("namespace", namespace).
|
|
Str("secret", secretName).
|
|
Msg("failed to add imagePullSecret to service account during registry SA secret migration")
|
|
}
|
|
}
|
|
}
|
|
|
|
env.PostInitMigrations.MigrateRegistrySASecrets = false
|
|
return tx.Endpoint().UpdateEndpoint(env.ID, env)
|
|
})
|
|
}
|
|
|
|
func (migrator *PostInitMigrator) MigrateIngresses(environment portainer.Endpoint, kubeclient *cli.KubeClient) error {
|
|
// Early exit if we do not need to migrate!
|
|
if !environment.PostInitMigrations.MigrateIngresses {
|
|
return nil
|
|
}
|
|
|
|
log.Debug().
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("migrating ingresses for environment")
|
|
|
|
if err := migrator.kubeFactory.MigrateEndpointIngresses(&environment, migrator.dataStore, kubeclient); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error migrating ingresses for environment")
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MigrateGPUs will check all docker endpoints for containers with GPUs and set EnableGPUManagement to true if any are found
|
|
// If there's an error getting the containers, we'll log it and move on
|
|
func (migrator *PostInitMigrator) MigrateGPUs(e portainer.Endpoint, dockerClient *client.Client) error {
|
|
return migrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
|
environment, err := tx.Endpoint().Endpoint(e.ID)
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(e.ID)).
|
|
Msg("error getting environment")
|
|
|
|
return err
|
|
}
|
|
|
|
// Early exit if we do not need to migrate!
|
|
if !environment.PostInitMigrations.MigrateGPUs {
|
|
return nil
|
|
}
|
|
|
|
log.Debug().
|
|
Int("endpoint_id", int(e.ID)).
|
|
Msg("migrating GPUs for environment")
|
|
|
|
// Get all containers
|
|
containers, err := dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("failed to list containers for environment")
|
|
|
|
return err
|
|
}
|
|
|
|
// Check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole environment
|
|
containersLoop:
|
|
for _, container := range containers {
|
|
// https://www.sobyte.net/post/2022-10/go-docker/ has nice documentation on the docker client with GPUs
|
|
containerDetails, err := dockerClient.ContainerInspect(context.Background(), container.ID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("failed to inspect container")
|
|
|
|
continue
|
|
}
|
|
|
|
deviceRequests := containerDetails.HostConfig.DeviceRequests
|
|
for _, deviceRequest := range deviceRequests {
|
|
if deviceRequest.Driver == "nvidia" {
|
|
environment.EnableGPUManagement = true
|
|
|
|
break containersLoop
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set the MigrateGPUs flag to false so we don't run this again
|
|
environment.PostInitMigrations.MigrateGPUs = false
|
|
if err := tx.Endpoint().UpdateEndpoint(environment.ID, environment); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Int("endpoint_id", int(environment.ID)).
|
|
Msg("error updating EnableGPUManagement flag for environment")
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|