Пайплайны и базы данных в Kubernetes: как сделать развёртывание умным

Если вам нужно запустить несколько batch-задач по цепочке или дождаться, пока внешняя система создаст нужный Secret, обычный YAML не поможет. Статья-перевод разбирает реальные сценарии и показывает, как описать их через код с конкретными примерами.

Обложка: Пайплайны и базы данных в Kubernetes: как сделать развёртывание умным

От переводчика: в статье разбирается, почему современные инструменты развёртывания (Helm, Kustomize, Argo CD) не справляются с оркестрацией сложных приложений, где важны порядок действий и взаимозависимости компонентов. В ней будут конкретные примеры проблем и способы их решения с полным рабочим кодом. Статья является переводом материала Дэвида Демаре-Мишо (David Desmarais-Michaud) «Kubernetes Orchestration is More Than a Bag of YAML».

Эту статью будет проще читать, если вы знакомы с основами Yoke и Air Traffic Controller.

Если вкратце, Yoke позволяет определять пакеты как распространяемые программы, скомпилированные в WASM, которые называются Flights («рейсы»). Air Traffic Controller («диспетчер воздушного движения») расширяет API Kubernetes через Airways («воздушные трассы») — специальный ресурс, который создаёт CRD и связывает его с конкретным Flight.

Этот материал как раз о том, как Air Traffic Controller открывает простор для мощной и гибкой оркестрации ресурсов.

За пределами плоского YAML: как выглядит реальная оркестрация в Kubernetes

Если вам доводилось управлять современными сложными приложениями в Kubernetes, то наверняка знакомо чувство тревоги, когда ждёшь, пока все ресурсы перейдут в healthy-статус. Например, база данных должна быть готова до запуска приложения, серия пакетных задач (batch jobs) должна выполниться в строгой последовательности, а сервис может зависеть от секрета, которым управляет совершенно другая система. Как описать эти взаимоотношения? Сегодня, по большому счёту, никак.

Годами Helm и Kustomize были основными инструментами для управления Kubernetes-приложениями. Но они работают как генераторы манифестов: вы описываете список YAML-файлов, отправляете их в API и надеетесь, что всё сработает. Для простых stateless-приложений это нормально, но когда нужны порядок выполнения, координация между компонентами или управление состоянием — такой подход не годится.

Вот тут и встаёт вопрос оркестрации: способности осмысленно управлять жизненным циклом компонентов приложения, а не просто создавать их, надеясь на удачу.

Пробел в оркестрации

Причина этого пробела кроется в истории. Инструменты вроде Helm и Kustomize — это, по сути, шаблонизаторы на стороне клиента. Они генерируют манифесты, и на этом их работа заканчивается. Даже серверные GitOps-решения вроде Argo CD или FluxCD придерживаются той же концепции, ведь они преимущественно используют те же инструменты для генерации манифестов, которые потом синхронизируют за вас.

Конечно, есть обходные пути. Вы наверняка сталкивались с хуками pre/post-install в Helm или волнами синхронизации (sync waves) в Argo CD. И хотя это полезные фичи, проблема решается лишь частично. Они срабатывают только при установке или обновлении, но не работают на протяжении всей жизни приложения.

Пара слов о YAML и итоговой согласованности

Мы осознаем, что предложение отойти от использования YAML воспринимается неоднозначно. Для многих в сообществе «голые» манифесты — единственный «правильный» способ взаимодействия с API Kubernetes. Такая привязанность привела к тому, что для решения сложных проблем с зависимостями все стали активно полагаться на один базовый паттерн — итоговую согласованность (eventual consistency).

Идея в том, чтобы выкатить всё разом и просто надеяться, что рано или поздно контроллеры всё приведут к желаемому состоянию, зависимости подтянутся и кластер сам… заведётся. Все мы видели это на практике: под уходит в CrashLoopBackOff и висит в нём, пока какая-нибудь другая система наконец не создаст нужный ему ConfigMap или Secret.

Сразу поясним: итоговая согласованность — это фундаментальная часть Kubernetes и очень мощная идея. Проблема в том, что мы слишком долго использовали её как единственный инструмент для управления комплексными воркфлоу. Нам приходилось на неё полагаться, потому что имеющиеся инструменты не давали иного выбора: что имеем, тем и пользуемся. Наши YAML-ориентированные утилиты могли описать лишь то, что мы хотим получить в итоге, но не то, как к этому прийти.

Дилемма оператора

Классический ответ на запрос о создании умных и гибких стратегий развёртывания всегда был один: «Пишите свой оператор».

И хотя создание оператора — мощный и правильный подход, это ещё и масштабная задача. Здесь и высокий порог входа, и постоянные расходы на разработку и поддержку, которые далеко не всегда оправданы. Неужели нет золотой середины?

Новая модель: логика приложения как код

Вот тут-то на сцену и выходят Yoke и Air Traffic Controller (ATC) с новым образом мышления. Вместо того чтобы генерировать статичный набор YAML-файлов, Yoke описывает пакеты в виде исполняемого кода.

Это позволяет сфокусироваться на логике оркестрации приложения, написав простую [WASM] программу, работающую по знакомой схеме:

  1. Прочитать входные параметры из кастомного ресурса.
  2. Принять решения, основываясь на текущем состоянии кластера.
  3. Обновить статус кастомного ресурса, указав прогресс или другую информацию.
  4. Создать те ресурсы, которые должны быть в кластере прямо сейчас.

И да, это очень похоже на классический цикл согласования (reconciliation loop) в контроллере Kubernetes — так и задумано.

ATC — это контроллер, чья единственная работа — приводить ресурсы в кластере к желаемому состоянию. WASM-модуль представляет собой ядро логики и выступает посредником для цикла согласования, управляя желаемым состоянием (ресурсами) пакетов и избавляя вас от необходимости создавать и поддерживать собственный оператор с нуля.

Как это работает на практике

По своей сути, «Flight» в Yoke — это просто программа (скомпилированная в WASM; аналог Helm-чартов), которая считывает кастомный ресурс (Custom Resource) из stdin и записывает желаемое состояние системы в stdout.

Но поскольку ATC перезапускает её в рамках цикла управления (control loop), она становится динамической и реагирующей. Код выполняется каждый раз, когда:

  • ресурс, которым он управляет (например, Job или Deployment), обновляется или удаляется;
  • внешний ресурс, который он отслеживает (например, Secret от другого инструмента), создаётся, обновляется или удаляется. 

Так код реагирует на изменения в кластере в реальном времени.

Примеры

Пример 1: пайплайн для последовательного запуска задач

Предположим, вам нужно создать ресурс Pipeline для запуска трёх задач друг за другом: job-one, job-two, job-three. При этом каждая следующая задача должна стартовать только после успешного завершения предыдущей.

С Yoke эту логику можно описать прямо в коде.

Сначала определим Go-тип для кастомного ресурса Pipeline:

			type Pipeline struct {
  metav1.TypeMeta
  metav1.ObjectMeta `json:"metadata"`
  Spec              struct {
    // ... Опишите свою спецификацию!  ...
  } `json:"spec,omitzero"`
  Status struct {
    // Поля статуса.
    // В нашем примере будем использовать простое сообщение msg, но оно может быть любым.
    Msg string `json:"msg"`
  }
}
		

Затем добавим соответствующее определение Airway, которое указывает ATC, как им управлять:

			v1alpha1.Airway{
  TypeMeta: metav1.TypeMeta{
    APIVersion: v1alpha1.AirwayGVR().GroupVersion().Identifier(),
    Kind:       v1alpha1.KindAirway,
  },
  ObjectMeta: metav1.ObjectMeta{
    Name: "pipelines.examples.com",
  },
  Spec: v1alpha1.AirwaySpec{
    WasmURLs: v1alpha1.WasmURLs{
      // URL, где будет храниться WASM-модуль.
      Flight: "oci://registry/repo:tag",
    },
    // Разрешаем доступ к кластеру, чтобы получать состояние.
    ClusterAccess: true,
    // Airway должен быть динамическим, чтобы перепроверять состояние при обновлении/создании субресурсов.
    Mode: v1alpha1.AirwayModeDynamic,
    Template: apiextensionsv1.CustomResourceDefinitionSpec{
      Group: "examples.com",
      Names: apiextensionsv1.CustomResourceDefinitionNames{
        Plural:   "pipelines",
        Singular: "pipeline",
        Kind:     "Pipeline",
      },
      Scope: apiextensionsv1.NamespaceScoped,
      Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
        {
          Name:    "v1",
          Served:  true,
          Storage: true,
          Schema: &apiextensionsv1.CustomResourceValidation{
            // Пишем openAPI-определение на основе нашего CustomResource.
            OpenAPIV3Schema: openapi.SchemaFrom(reflect.TypeFor[Pipeline]()),
          },
        },
      },
    },
  },
}
		

Основная магия содержится в WASM-модуле. Рассмотрим её пошагово:

			// run содержит основную логику оркестрации для пайплайна.
func run() error {
    var pipeline Pipeline
    if err := yaml.NewYAMLToJSONDecoder(os.Stdin).Decode(&pipeline); err != nil {
        return fmt.Errorf("failed to decode stdin into pipeline: %w", err)
    }

    // Слайс resources будет хранить желаемое состояние приложения.
    // Добавляем сюда и сам пайплайн, чтобы можно было обновлять его статус.
    resources := flight.Resources{&pipeline}

    for _, job := range []*batchv1.Job{
        {
            TypeMeta:   metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
            ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-one"},
            Spec:       batchv1.JobSpec{ /* ... здесь должна быть спецификация Job ... */ },
        },
        {
            TypeMeta:   metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
            ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-two"},
            Spec:       batchv1.JobSpec{ /* ... здесь должна быть спецификация Job ... */ },
        },
        {
            TypeMeta:   metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
            ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-three"},
            Spec:       batchv1.JobSpec{ /* ... здесь должна быть спецификация Job ... */ },
        },
    } {
        // Добавляем текущую задачу в желаемое состояние пакета.
        resources = append(resources, job)

        // Проверяем, завершилась ли текущая задача, посмотрев её актуальное состояние в кластере.
        // Если нет — выводим текущее состояние в stdout и выходим.
        // ATC перезапустит проверку, когда статус Job изменится.
        ok, err := hasJobCompleted(&pipeline, job)
        if err != nil {
            return fmt.Errorf("failed to check job completion for %s: %w", job.Name, err)
        }
        if !ok {
            // Задача не выполнена или завершилась ошибкой; статус уже установлен в hasJobCompleted.
            // Выходим здесь и ждём следующей реконсиляции.
            return json.NewEncoder(os.Stdout).Encode(resources)
        }
    }

    // Если удалось пройти весь цикл, значит, все задачи выполнены.
    pipeline.Status.Msg = "all jobs have completed"
    return json.NewEncoder(os.Stdout).Encode(resources)
}

// isJobStatus — это вспомогательный инструмент для проверки наличия определённого типа условий в статусе задачи.
func isJobStatus(job *batchv1.Job, typ batchv1.JobConditionType) bool {
    return job != nil && slices.ContainsFunc(job.Status.Conditions, func(condition batchv1.JobCondition) bool {
        return condition.Type == typ
    })
}

// hasJobCompleted проверяет текущее состояние задачи в кластере.
func hasJobCompleted(pipeline *Pipeline, job *batchv1.Job) (ok bool, err error) {
    // k8s.LookupResource извлекает текущее состояние ресурса из API Kubernetes.
    live, err := k8s.LookupResource(job)
    if err != nil && !k8s.IsErrNotFound(err) {
        return false, fmt.Errorf("failed to lookup job %s: %w", job.Name, err)
    }

    // Сначала проверяем на сбой.
    if isJobStatus(live, batchv1.JobFailed) {
        pipeline.Status.Msg = fmt.Sprintf("job %s failed", job.Name)
        return false, nil // 'ok' is false, indicating we should stop.
    }

    // Проверяем выполнение.
    if !isJobStatus(live, batchv1.JobComplete) {
        pipeline.Status.Msg = fmt.Sprintf("waiting for job %s to complete", job.Name)
        return false, nil // 'ok' is false, not complete yet.
    }

    // Задача выполнена!
    return true, nil
}
		

Эта простая Go-программа как раз и реализует логику оркестрации. Она создаёт задачу, проверяет её статус и движется дальше, только когда та завершится. А цикл управления от ATC берёт на себя всю рутину по «ожиданию» и «перезапускам».

Пример 2: координация с внешними ресурсами

Рассмотрим другой распространённый сценарий: приложению требуется база данных. Вы используете Crossplane для инициализации CloudSQLInstance, который в итоге создаёт Secret, содержащий данные для подключения. Deployment не запустится до тех пор, пока этот Secret не будет существовать.

Сегодня вы, скорее всего, просто выкатываете всё сразу и ждёте, когда появится секрет (под приложения находится в состоянии CrashLoopBackOff). Но можно поступить умнее.

Давайте опишем ресурс App, который будет управлять этим процессом:

			type App struct {
  metav1.TypeMeta
  metav1.ObjectMeta `json:"metadata"`
  Spec              struct {
    // Параметры вашего ресурса
  } `json:"spec"`
  Status struct {
    // В этом примере для статуса будем использовать простое сообщение.
    // Но тут может быть что угодно!
    Msg string `json:"msg"`
  } `json:"status,omitzero"`
}
		

Airway нужно будет настроить так, чтобы он имел доступ к кластеру, а если точнее — чтобы мог искать ресурсы типа Secret.

			v1alpha1.Airway{
  TypeMeta: metav1.TypeMeta{
    APIVersion: v1alpha1.AirwayGVR().GroupVersion().Identifier(),
    Kind:       v1alpha1.KindAirway,
  },
  ObjectMeta: metav1.ObjectMeta{
    Name: "pipelines.examples.com",
  },
  Spec: v1alpha1.AirwaySpec{
    WasmURLs: v1alpha1.WasmURLs{
      // URL, где будет храниться WASM-модуль.
      Flight: "oci://registry/repo:tag",
    },
    // Разрешаем доступ к кластеру, чтобы получать состояние.
    ClusterAccess: true,
    // Явно даём разрешение на поиск Secret'ов.
    // Любая попытка найти ресурс, не указанный здесь, будет заблокирована.
    ResourceAccessMatchers: []string{"Secret"},
    // Airway должен быть динамическим, чтобы перепроверять состояние при обновлении/создании субресурсов..
    Mode: v1alpha1.AirwayModeDynamic,
    Template: apiextensionsv1.CustomResourceDefinitionSpec{
      Group: "examples.com",
      Names: apiextensionsv1.CustomResourceDefinitionNames{
        Plural:   "apps",
        Singular: "app",
        Kind:     "App",
      },
      Scope: apiextensionsv1.NamespaceScoped,
      Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
        {
          Name:    "v1",
          Served:  true,
          Storage: true,
          Schema: &apiextensionsv1.CustomResourceValidation{
            // Пишем openAPI-определение на основе нашего CustomResource.
            OpenAPIV3Schema: openapi.SchemaFrom(reflect.TypeFor[App]()),
          },
        },
      },
    },
  },
}
		

Логика в нашем WASM-модуле будет такой:

			// run содержит ядро логики оркестрации для приложения.
func run() error {
    var app App
    if err := yaml.NewYAMLToJSONDecoder(os.Stdin).Decode(&app); err != nil {
        return fmt.Errorf("failed to decode stdin into App instance: %v", err)
    }

    // Включаем само приложение в конечный результат, чтобы установить его статус.
    resources := flight.Resources{&app}

    // 1. Задаём инстанс базы данных с помощью провайдера Crossplane.
    database := databasev1beta1.CloudSQLInstance{
        TypeMeta: metav1.TypeMeta{
            APIVersion: databasev1beta1.SchemeGroupVersion.Identifier(),
            Kind:       "CloudSQLInstance",
        },
        ObjectMeta: metav1.ObjectMeta{Name: app.Name},
        Spec: databasev1beta1.CloudSQLInstanceSpec{
            ForProvider: databasev1beta1.CloudSQLInstanceParameters{ /* ... ваши параметры ... */ },
            ResourceSpec: commonv1.ResourceSpec{
                WriteConnectionSecretToReference: &commonv1.SecretReference{
                    Name:      app.Name,
                    Namespace: app.Namespace,
                },
            },
        },
    }
    resources = append(resources, &database)

    // 2. Ищем секрет, который создаст Crossplane.
    secretIdentifier := k8s.ResourceIdentifier{
        Name:       database.Spec.WriteConnectionSecretToReference.Name,
        Namespace:  database.Spec.WriteConnectionSecretToReference.Namespace,
        ApiVersion: "v1",
        Kind:       "Secret",
    }

    secret, err := k8s.Lookup[corev1.Secret](secretIdentifier)
    if err != nil {
        if k8s.IsErrNotFound(err) {
            // Секрета ещё не существует. Обновим статус и подождём.
            // ATC автоматически перезапустит код, когда секрет будет создан.
            app.Status.Msg = "Waiting for connection secret to be created"
            return json.NewEncoder(os.Stdout).Encode(resources)
        }
        return fmt.Errorf("failed to fetch connection secret: %w", err)
    }

    // 3. Секрет существует! Теперь можно создать развёртывание, которое его использует.
    deployment := &appsv1.Deployment{
        TypeMeta: metav1.TypeMeta{
            APIVersion: appsv1.SchemeGroupVersion.Identifier(),
            Kind:       "Deployment",
        },
        ObjectMeta: metav1.ObjectMeta{Name: app.Name},
        Spec: appsv1.DeploymentSpec{
            Template: corev1.PodTemplateSpec{
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            // ... другие поля контейнера ...
                            EnvFrom: []corev1.EnvFromSource{
                                {
                                    SecretRef: &corev1.SecretEnvSource{
                                        LocalObjectReference: corev1.LocalObjectReference{Name: secret.Name},
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }
    resources = append(resources, deployment)

    // 4. В заключение сообщим о статусе нашего развёртывания.
    live, err := k8s.LookupResource(deployment)
    if err != nil && !k8s.IsErrNotFound(err) {
        return fmt.Errorf("failed to lookup deployment: %w", err)
    }

    if live != nil && slices.ContainsFunc(live.Status.Conditions, func(cond appsv1.DeploymentCondition) bool {
        return cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue
    }) {
        app.Status.Msg = "application deployed and ready"
    } else {
        app.Status.Msg = "waiting for application to become ready"
    }

    return json.NewEncoder(os.Stdout).Encode(resources)
}
		

Эта логика описывает зависимость: создать базу данных, дождаться секрета и только после этого развернуть приложение. Больше никаких подов в состоянии CrashLoopBackOff, только чистая оркестрация с отслеживанием состояния!

Заключительные мысли

Применяя подход «логика приложения как код», Yoke предлагает компромисс между статическими YAML-шаблонами и полнофункциональными операторами. Он позволяет кодифицировать сложные, активно реагирующие стратегии развёртывания с отслеживанием состояния, используя привычные языки программирования и инструменты, делая настоящую оркестрацию доступной для каждого.