rest-pipeline-js 1.3.7: плагины, stream-шаги, fluent builder и метрики pipeline

04.04.2026
rest-pipeline-js 1.3.7: плагины, stream-шаги, fluent builder и метрики pipeline

В предыдущих версиях мы занимались ядром: retry, кэш, SWR, interceptors, авторизация через 401, DAG-переходы, вложенные пайплайны. Всё это работает. Но чем богаче становится API, тем больнее с ним работать, если он неудобен в сборке.

В 1.3.7 упор сделан на опыт разработчика и расширяемость. Новый fluent builder, короткая фабричная функция, валидатор конфига, метрики, плагины, персистентное состояние — и два более сложных сценария: stream-шаги для SSE и поддержка нативного fetch.

Обратная совместимость сохранена полностью. Все новые фичи опциональны.


DX: писать pipeline стало проще

createPipeline() — меньше вложенности

new PipelineOrchestrator({ config: { stages: [...] } }) — рабочий паттерн, но многословный. Особенно когда нужно быстро набросать сценарий без лишней церемонии.

javascript Copy
import { createPipeline } from "rest-pipeline-js";

const orchestrator = createPipeline(
  [
    { key: "fetchUser",    request: async () => fetchUser() },
    { key: "processData",  request: async ({ prev }) => process(prev) },
    { key: "save",         request: async ({ prev }) => save(prev) },
  ],
  {
    httpConfig:      { baseURL: "https://api.example.com" },
    sharedData:      { userId: 42 },
    pipelineOptions: { continueOnError: false },
    metrics:         { onStepDuration: ({ stepKey, durationMs }) => log(stepKey, durationMs) },
  },
);

await orchestrator.run();

Та же PipelineOrchestrator под капотом. Просто без вложенности.


pipe() — fluent builder

Для тех, кто предпочитает строить пайплайн цепочкой вызовов:

javascript Copy
import { pipe } from "rest-pipeline-js";

const orchestrator = pipe()
  .step({ key: "auth", request: async () => getToken() })
  .step({
    key: "fetchUser",
    condition: ({ prev }) => !!prev,
    request: async ({ prev }) => fetchUser(prev),
  })
  .parallel([
    { key: "loadPosts",  request: async () => fetchPosts() },
    { key: "loadNotifs", request: async () => fetchNotifications() },
  ])
  .step({
    key: "render",
    request: async ({ allResults }) => render(allResults),
  })
  .build({
    httpConfig: { baseURL: "https://api.example.com" },
  });

Доступные методы builder'а:

Метод Описание
.step(stage) Добавить обычный шаг
.parallel(stages, options?) Добавить параллельную группу
.subPipeline(item) Вложенный pipeline как шаг
.stream(stage) Stream-шаг (AsyncIterable)
.build(options?) Создать PipelineOrchestrator
.toConfig(options?) Получить только PipelineConfig без оркестратора

.toConfig() полезен, когда конфиг нужно передать в другое место или сериализовать — без создания лишнего экземпляра.


validatePipelineConfig() — ошибки до запуска

Раньше неверный конфиг обнаруживался в рантайме: дубликат ключа — в середине выполнения, пустой массив stages — при первом run().

Теперь можно проверить конфиг заранее:

javascript Copy
import { validatePipelineConfig } from "rest-pipeline-js";

const { valid, errors } = validatePipelineConfig({
  stages: [
    { key: "step1", request: async () => data },
    { key: "step1", request: async () => other }, // дубликат!
    { key: "",      request: async () => null   }, // пустой ключ!
    { key: "step3", retryCount: -1              }, // отрицательное значение!
  ],
});

if (!valid) {
  console.error(errors);
  // [
  //   '[root] duplicate stage key: "step1"',
  //   '[root] stage key must be a non-empty string (got: "")',
  //   '[root] stage "step3": retryCount must be a non-negative number',
  // ]
}

Что проверяется:

  • Дублирующиеся ключи (в том числе внутри параллельных групп)
  • Пустые или не-строковые ключи
  • Пустой массив stages
  • Некорректные типы полей: request, condition, retryCount, timeoutMs
  • Рекурсивно — вложенные subPipeline-конфиги

Валидатор только читает конфиг и возвращает { valid, errors }. Никакого side-effect, никаких исключений.


Типобезопасные ключи шагов

PipelineOrchestrator теперь принимает generic-параметр с ключами шагов:

typescript Copy
const orchestrator = new PipelineOrchestrator<"fetchUser" | "processData" | "save">({
  config: { stages: [ /* ... */ ] },
});

// IDE подскажет допустимые варианты:
orchestrator.on("step:fetchUser:success", (event) => { ... });
orchestrator.rerunStep("processData");
orchestrator.subscribeStepProgress("save", (status) => { ... });

// Опечатка → ошибка компиляции:
orchestrator.on("step:fetchUzer:success", handler); // ❌ TypeScript не пропустит

Если generic не указать — принимается любая строка, поведение как раньше.


usePipelineStageResult — хук для одного шага

Раньше единственный способ следить за результатом конкретного шага — подписаться на subscribeStageResults и фильтровать по ключу руками. Теперь есть отдельные хуки:

vue Copy
<!-- Vue -->
<script setup>
import { usePipelineStageResultVue } from "rest-pipeline-js/vue";

const userResult = usePipelineStageResultVue(orchestrator, "fetchUser");
// userResult.value?.status === "success"
// userResult.value?.data — данные шага
// userResult.value?.error — ошибка шага
</script>

<template>
  <div v-if="userResult?.status === 'loading'">Загрузка...</div>
  <div v-else-if="userResult?.status === 'error'">{{ userResult.error.message }}</div>
  <div v-else-if="userResult?.status === 'success'">{{ userResult.data }}</div>
</template>
jsx Copy
// React
import { usePipelineStageResultReact } from "rest-pipeline-js/react";

function UserCard() {
  const result = usePipelineStageResultReact(orchestrator, "fetchUser");

  if (!result || result.status === "pending") return <Skeleton />;
  if (result.status === "error")             return <ErrorView error={result.error} />;
  return <User data={result.data} />;
}

Хук реактивно обновляется при каждом изменении — и только для нужного шага.


Метрики pipeline

Раньше MetricsHandler работал только для HTTP-запросов внутри шагов. Что происходило с самим pipeline — сколько он шёл, какой шаг занял больше всего времени — наружу не выходило.

Теперь в PipelineConfig есть поле metrics:

javascript Copy
const orchestrator = new PipelineOrchestrator({
  config: {
    stages: [ /* ... */ ],
    metrics: {
      onPipelineStart: ({ timestamp }) => {
        console.log("Pipeline started at", new Date(timestamp).toISOString());
        perfTimer.start("pipeline");
      },
      onPipelineEnd: ({ durationMs, success, stageResults }) => {
        analytics.track("pipeline_complete", { durationMs, success });
        if (!success) {
          const failed = Object.entries(stageResults)
            .filter(([, r]) => r.status === "error")
            .map(([key]) => key);
          Sentry.captureMessage(`Pipeline failed at: ${failed.join(", ")}`);
        }
      },
      onStepDuration: ({ stepKey, durationMs, status }) => {
        // Логируем каждый шаг — видно, где тормозит
        datadog.histogram("pipeline.step.duration", durationMs, { step: stepKey, status });
      },
    },
  },
});

Три коллбэка — не более:

  • onPipelineStart({ timestamp }) — в начале run()
  • onPipelineEnd({ durationMs, success, stageResults }) — при завершении
  • onStepDuration({ stepKey, durationMs, status }) — после каждого шага

Всё это не влияет на выполнение pipeline. Если коллбэк бросает исключение — оно не перехватывается. Это намеренно: метрики не должны тихо глотать ошибки.


PipelineLogEventType

Все типы событий в логах теперь собраны в один union:

typescript Copy
import type { PipelineLogEventType } from "rest-pipeline-js";

function handleLog(event: { type: PipelineLogEventType; stepKey?: string; error?: any }) {
  switch (event.type) {
    case "step:success":    console.log("✓", event.stepKey); break;
    case "step:error":      console.error("✗", event.stepKey, event.error); break;
    case "pipeline:retry":  console.warn("↺ retry", (event as any).attempt); break;
  }
}

orchestrator.on("log", handleLog);

Полный список: step:start, step:success, step:error, step:skipped, rerunStep:start, rerunStep:success, rerunStep:error, pipeline:retry, pipeline:error, subPipeline:start, subPipeline:success, subPipeline:error, subPipeline:exception, stream:start, stream:success, stream:error.


Плагинная система

Паттерн «подписаться на события оркестратора и что-то делать» встречается часто: логирование, аналитика, трекинг ошибок. Раньше это всегда был повторяющийся boilerplate в каждом проекте.

Плагины дают контракт для таких расширений:

javascript Copy
// Плагин логирования — пакуем один раз, переиспользуем везде
const loggingPlugin = {
  name: "logging",
  install(orchestrator) {
    const offLog = orchestrator.on("log", (event) => {
      if (event.type === "step:start")   console.log(`→ [${event.stepKey}]`);
      if (event.type === "step:success") console.log(`✓ [${event.stepKey}]`);
      if (event.type === "step:error")   console.error(`✗ [${event.stepKey}]`, event.error);
    });

    // Возвращаем cleanup — вызовется при orchestrator.destroy()
    return () => offLog();
  },
};

const orchestrator = new PipelineOrchestrator({
  config: {
    stages: [ /* ... */ ],
    options: {
      plugins: [loggingPlugin, analyticsPlugin, sentryPlugin],
    },
  },
});

// Когда оркестратор больше не нужен:
orchestrator.destroy();

Интерфейс PipelinePlugin:

typescript Copy
type PipelinePlugin = {
  name: string;
  install(orchestrator: PipelineOrchestrator): void | (() => void);
};

install() получает полный экземпляр оркестратора. Можно делать всё что угодно: подписываться на события, патчить sharedData, вызывать pause() по условию. Если возвращает функцию — она попадает в destroy().

Сами плагины — просто объекты. Никаких базовых классов, никакой магии.


Персистентное состояние

exportState() и importState() уже были — позволяли вручную сохранять и восстанавливать снимок. Но «вручную» — это ключевое слово: нужно было самому решить, когда сохранять, где хранить, когда загружать.

Теперь это автоматизируется через адаптер:

javascript Copy
const localStorageAdapter = {
  save: (state) => localStorage.setItem("pipeline:state", JSON.stringify(state)),
  load: () => {
    const raw = localStorage.getItem("pipeline:state");
    return raw ? JSON.parse(raw) : null;
  },
};

const orchestrator = new PipelineOrchestrator({
  config: {
    stages: [ /* ... */ ],
    options: { persistAdapter: localStorageAdapter },
  },
});

// run() загружает сохранённое состояние в начале
// После каждого успешного шага — сохраняет текущее
await orchestrator.run();

Практический сценарий: длинный многошаговый процесс, пользователь случайно обновил страницу. При следующем открытии pipeline продолжается с того места, где остановился, а не с нуля.

Адаптер может быть асинхронным — полезно для IndexedDB, sessionStorage или серверного хранилища:

typescript Copy
type PipelineStateAdapter = {
  save(state: PipelineExportedState): void | Promise<void>;
  load(): PipelineExportedState | null | Promise<PipelineExportedState | null>;
};

Если load() вернул null — pipeline стартует с чистого листа. Если save() выбросил ошибку — pipeline не прерывается, ошибка тихо игнорируется. Это намеренное поведение: персистентность не должна ломать основной сценарий.


Stream-шаги: SSE и AsyncIterable

До этой версии все шаги pipeline были одноразовыми: запрос → ответ → следующий шаг. Потоки данных — SSE, WebSocket, длинные стримы — не вписывались в этот паттерн.

Теперь есть StreamStageConfig. Поле stream принимает функцию, возвращающую AsyncIterable<T>. Оркестратор итерирует её и собирает все чанки в массив — это и есть результат шага.

javascript Copy
const orchestrator = createPipeline([
  {
    key: "auth",
    request: async () => await getToken(),
  },
  {
    key: "liveData",
    // stream получает те же параметры, что и обычный request
    stream: async function* ({ prev, sharedData }) {
      // prev — результат шага "auth" (токен)
      const source = new EventSource(`/api/events?token=${prev}`);

      try {
        yield* eventSourceToAsyncIterable(source);
      } finally {
        source.close();
      }
    },
    // Вызывается для каждого чанка немедленно — не ждёт завершения потока
    onChunk: (chunk, sharedData) => {
      sharedData.buffer = (sharedData.buffer ?? "") + chunk;
      updateUI(sharedData.buffer);
    },
  },
  {
    key: "finalize",
    request: async ({ allResults }) => {
      // allResults.liveData.data — массив всех накопленных чанков
      return allResults.liveData.data.join("");
    },
  },
]);

Несколько важных деталей реализации:

onChunk — это side-effect, не трансформация. Результатом шага всегда будет полный массив чанков. Если нужно агрегировать по-своему — делай это через sharedData в onChunk, потом читай в следующем шаге.

Abort работает корректно. После каждого чанка проверяется сигнал отмены. Если orchestrator.abort() вызван во время стрима — итерация прекращается, шаг завершается с ошибкой.

Stream-шаги поддерживают continueOnError. Если поток упал на середине — можно продолжить pipeline дальше, как с любым другим шагом.

Стандартные события сохраняются. step:start, step:success, step:error — всё работает. Плюс специфические: stream:start, stream:success, stream:error в лог-событиях.


HttpAdapter: fetch вместо axios

Библиотека была жёстко привязана к axios. В edge-окружениях — Cloudflare Workers, Deno, Bun — это создаёт проблемы: axios может быть недоступен или работать некорректно.

Теперь в HttpConfig есть поле adapter:

javascript Copy
// Передаём нативный fetch как адаптер
const fetchAdapter = {
  async request(config) {
    const url = `${config.baseURL ?? ""}${config.url ?? ""}`;
    const res = await fetch(url, {
      method:  config.method ?? "GET",
      body:    config.data ? JSON.stringify(config.data) : undefined,
      headers: { "Content-Type": "application/json", ...config.headers },
      signal:  config.signal,
    });
    const data = await res.json();
    return {
      data,
      status:     res.status,
      statusText: res.statusText,
      headers:    Object.fromEntries(res.headers.entries()),
    };
  },
};

const client = createRestClient({
  baseURL: "https://api.example.com",
  adapter: fetchAdapter,
  // Все остальные фичи работают поверх адаптера без изменений:
  auth:           { getToken: async () => token },
  interceptors:   { request: [addCorrelationId] },
  sanitizeHeaders: true,
  retry:          { attempts: 3, delayMs: 500, backoffMultiplier: 2 },
});

Интерфейс намеренно минимальный:

typescript Copy
type HttpAdapter = {
  request<T = unknown>(
    config: RestRequestConfig & { baseURL?: string }
  ): Promise<ApiResponse<T>>;
};

Ошибки из адаптера обрабатываются стандартно через toApiError(). 401-retry тоже работает — проверяется поле status на объекте ошибки вместо axios.isAxiosError.

Если adapter не указан — используется axios как раньше. Замена полностью прозрачна для остального кода.


Что ещё добавлено

getStageResults() — синхронный снимок результатов всех шагов. Не нужно подписываться, если нужно просто прочитать текущее состояние в определённый момент:

javascript Copy
const results = orchestrator.getStageResults();
console.log(results.fetchUser?.data);
console.log(results.processData?.status);

orchestrator.destroy() — вызывает cleanup-функции всех установленных плагинов. Вызывай при размонтировании компонента или уничтожении модуля, чтобы не оставлять висящих подписок.


Новые типы

  • PipelineMetrics — интерфейс для config.metrics
  • PipelinePlugin — контракт плагина
  • PipelineStateAdapter — контракт адаптера персистентности
  • StreamStageConfig<T> — тип stream-шага; добавлен в union PipelineItem
  • HttpAdapter — контракт HTTP-адаптера; добавлен в HttpConfig
  • PipelineLogEventType — union всех имён лог-событий

Обратная совместимость

Все изменения полностью обратно совместимы:

  • createPipeline() и pipe() — дополнение, не замена конструктора
  • PipelineOrchestrator<TKeys> — generic по умолчанию string, старый код не ломается
  • Метрики, плагины, persistAdapter — опциональные поля в конфиге, без них поведение не меняется
  • adapter в HttpConfig — если не указан, используется axios
  • Stream-шаги — новый тип в PipelineItem, существующий код не затронут

Релиз 1.3.7 уже на npm:

bash Copy
npm i rest-pipeline-js@1.3.7

Репозиторий: github.com/macrulezru/pipeline-js
Документация: всё в README
npm: npmjs.com/package/rest-pipeline-js

Читать далее

03.04.2026

rest-pipeline-js 1.3.6: DAG-переходы, вложенные пайплайны, SWR-кэш и перехватчики

Прошлый релиз добавил параллельные шаги, глобальный middleware и паузу. Но оставалось несколько вещей, которые в пайплайне выглядели как белые пятна.

Например, как сделать нелинейный сценарий? Как переиспользовать цепочку шагов внутри другой цепочки? А в HTTP-клиенте — как обновлять данные в фоне, не заставляя пользователя ждать? И почему до сих пор нет нормальных перехватчиков?

Версия 1.3.6 закрывает всё это.

Метки
rest-pipeline-jspipeline-orchestratordag-transitionsstale-while-revalidatehttp-interceptors
02.04.2026

rest-pipeline-js 1.3.5: Retry-After, авторизация через 401 и безопасные метрики

Вы когда-нибудь ловили себя на мысли, что ваш HTTP-клиент всё ещё не умеет правильно читать заголовок Retry-After? Или что токен авторизации приходится обновлять вручную в каждом компоненте? В версии 1.3.5 эти проблемы остались в прошлом.

Метки
rest-pipeline-jstypescripthttp-clientretry-afterauthenticationlogging
31.03.2026

css-magic-gradient 1.2.0 — гармонии, палитры, WCAG по всей длине и canvas-экспорт

Версия 1.2.0 библиотеки css-magic-gradient: расширенные цветовые гармонии, генераторы тинтов и шейдов, переработанная доступность с проверкой по всем точкам градиента, CSS-переменные, экспорт в canvas и 9 новых хуков для Vue и React.

Метки
css-градиентыtypescriptreactvuewcagcolor-harmony