rest-pipeline-js 1.3.7: плагины, stream-шаги, fluent builder и метрики pipeline

В предыдущих версиях мы занимались ядром: retry, кэш, SWR, interceptors, авторизация через 401, DAG-переходы, вложенные пайплайны. Всё это работает. Но чем богаче становится API, тем больнее с ним работать, если он неудобен в сборке.
В 1.3.7 упор сделан на опыт разработчика и расширяемость. Новый fluent builder, короткая фабричная функция, валидатор конфига, метрики, плагины, персистентное состояние — и два более сложных сценария: stream-шаги для SSE и поддержка нативного fetch.
Обратная совместимость сохранена полностью. Все новые фичи опциональны.
DX: писать pipeline стало проще
createPipeline() — меньше вложенности
new PipelineOrchestrator({ config: { stages: [...] } }) — рабочий паттерн, но многословный. Особенно когда нужно быстро набросать сценарий без лишней церемонии.
javascript
import { createPipeline } from "rest-pipeline-js";
const orchestrator = createPipeline(
[
{ key: "fetchUser", request: async () => fetchUser() },
{ key: "processData", request: async ({ prev }) => process(prev) },
{ key: "save", request: async ({ prev }) => save(prev) },
],
{
httpConfig: { baseURL: "https://api.example.com" },
sharedData: { userId: 42 },
pipelineOptions: { continueOnError: false },
metrics: { onStepDuration: ({ stepKey, durationMs }) => log(stepKey, durationMs) },
},
);
await orchestrator.run();
Та же PipelineOrchestrator под капотом. Просто без вложенности.
pipe() — fluent builder
Для тех, кто предпочитает строить пайплайн цепочкой вызовов:
javascript
import { pipe } from "rest-pipeline-js";
const orchestrator = pipe()
.step({ key: "auth", request: async () => getToken() })
.step({
key: "fetchUser",
condition: ({ prev }) => !!prev,
request: async ({ prev }) => fetchUser(prev),
})
.parallel([
{ key: "loadPosts", request: async () => fetchPosts() },
{ key: "loadNotifs", request: async () => fetchNotifications() },
])
.step({
key: "render",
request: async ({ allResults }) => render(allResults),
})
.build({
httpConfig: { baseURL: "https://api.example.com" },
});
Доступные методы builder'а:
| Метод | Описание |
|---|---|
.step(stage) |
Добавить обычный шаг |
.parallel(stages, options?) |
Добавить параллельную группу |
.subPipeline(item) |
Вложенный pipeline как шаг |
.stream(stage) |
Stream-шаг (AsyncIterable) |
.build(options?) |
Создать PipelineOrchestrator |
.toConfig(options?) |
Получить только PipelineConfig без оркестратора |
.toConfig() полезен, когда конфиг нужно передать в другое место или сериализовать — без создания лишнего экземпляра.
validatePipelineConfig() — ошибки до запуска
Раньше неверный конфиг обнаруживался в рантайме: дубликат ключа — в середине выполнения, пустой массив stages — при первом run().
Теперь можно проверить конфиг заранее:
javascript
import { validatePipelineConfig } from "rest-pipeline-js";
const { valid, errors } = validatePipelineConfig({
stages: [
{ key: "step1", request: async () => data },
{ key: "step1", request: async () => other }, // дубликат!
{ key: "", request: async () => null }, // пустой ключ!
{ key: "step3", retryCount: -1 }, // отрицательное значение!
],
});
if (!valid) {
console.error(errors);
// [
// '[root] duplicate stage key: "step1"',
// '[root] stage key must be a non-empty string (got: "")',
// '[root] stage "step3": retryCount must be a non-negative number',
// ]
}
Что проверяется:
- Дублирующиеся ключи (в том числе внутри параллельных групп)
- Пустые или не-строковые ключи
- Пустой массив
stages - Некорректные типы полей:
request,condition,retryCount,timeoutMs - Рекурсивно — вложенные
subPipeline-конфиги
Валидатор только читает конфиг и возвращает { valid, errors }. Никакого side-effect, никаких исключений.
Типобезопасные ключи шагов
PipelineOrchestrator теперь принимает generic-параметр с ключами шагов:
typescript
const orchestrator = new PipelineOrchestrator<"fetchUser" | "processData" | "save">({
config: { stages: [ /* ... */ ] },
});
// IDE подскажет допустимые варианты:
orchestrator.on("step:fetchUser:success", (event) => { ... });
orchestrator.rerunStep("processData");
orchestrator.subscribeStepProgress("save", (status) => { ... });
// Опечатка → ошибка компиляции:
orchestrator.on("step:fetchUzer:success", handler); // ❌ TypeScript не пропустит
Если generic не указать — принимается любая строка, поведение как раньше.
usePipelineStageResult — хук для одного шага
Раньше единственный способ следить за результатом конкретного шага — подписаться на subscribeStageResults и фильтровать по ключу руками. Теперь есть отдельные хуки:
vue
<!-- Vue -->
<script setup>
import { usePipelineStageResultVue } from "rest-pipeline-js/vue";
const userResult = usePipelineStageResultVue(orchestrator, "fetchUser");
// userResult.value?.status === "success"
// userResult.value?.data — данные шага
// userResult.value?.error — ошибка шага
</script>
<template>
<div v-if="userResult?.status === 'loading'">Загрузка...</div>
<div v-else-if="userResult?.status === 'error'">{{ userResult.error.message }}</div>
<div v-else-if="userResult?.status === 'success'">{{ userResult.data }}</div>
</template>
jsx
// React
import { usePipelineStageResultReact } from "rest-pipeline-js/react";
function UserCard() {
const result = usePipelineStageResultReact(orchestrator, "fetchUser");
if (!result || result.status === "pending") return <Skeleton />;
if (result.status === "error") return <ErrorView error={result.error} />;
return <User data={result.data} />;
}
Хук реактивно обновляется при каждом изменении — и только для нужного шага.
Метрики pipeline
Раньше MetricsHandler работал только для HTTP-запросов внутри шагов. Что происходило с самим pipeline — сколько он шёл, какой шаг занял больше всего времени — наружу не выходило.
Теперь в PipelineConfig есть поле metrics:
javascript
const orchestrator = new PipelineOrchestrator({
config: {
stages: [ /* ... */ ],
metrics: {
onPipelineStart: ({ timestamp }) => {
console.log("Pipeline started at", new Date(timestamp).toISOString());
perfTimer.start("pipeline");
},
onPipelineEnd: ({ durationMs, success, stageResults }) => {
analytics.track("pipeline_complete", { durationMs, success });
if (!success) {
const failed = Object.entries(stageResults)
.filter(([, r]) => r.status === "error")
.map(([key]) => key);
Sentry.captureMessage(`Pipeline failed at: ${failed.join(", ")}`);
}
},
onStepDuration: ({ stepKey, durationMs, status }) => {
// Логируем каждый шаг — видно, где тормозит
datadog.histogram("pipeline.step.duration", durationMs, { step: stepKey, status });
},
},
},
});
Три коллбэка — не более:
onPipelineStart({ timestamp })— в началеrun()onPipelineEnd({ durationMs, success, stageResults })— при завершенииonStepDuration({ stepKey, durationMs, status })— после каждого шага
Всё это не влияет на выполнение pipeline. Если коллбэк бросает исключение — оно не перехватывается. Это намеренно: метрики не должны тихо глотать ошибки.
PipelineLogEventType
Все типы событий в логах теперь собраны в один union:
typescript
import type { PipelineLogEventType } from "rest-pipeline-js";
function handleLog(event: { type: PipelineLogEventType; stepKey?: string; error?: any }) {
switch (event.type) {
case "step:success": console.log("✓", event.stepKey); break;
case "step:error": console.error("✗", event.stepKey, event.error); break;
case "pipeline:retry": console.warn("↺ retry", (event as any).attempt); break;
}
}
orchestrator.on("log", handleLog);
Полный список: step:start, step:success, step:error, step:skipped, rerunStep:start, rerunStep:success, rerunStep:error, pipeline:retry, pipeline:error, subPipeline:start, subPipeline:success, subPipeline:error, subPipeline:exception, stream:start, stream:success, stream:error.
Плагинная система
Паттерн «подписаться на события оркестратора и что-то делать» встречается часто: логирование, аналитика, трекинг ошибок. Раньше это всегда был повторяющийся boilerplate в каждом проекте.
Плагины дают контракт для таких расширений:
javascript
// Плагин логирования — пакуем один раз, переиспользуем везде
const loggingPlugin = {
name: "logging",
install(orchestrator) {
const offLog = orchestrator.on("log", (event) => {
if (event.type === "step:start") console.log(`→ [${event.stepKey}]`);
if (event.type === "step:success") console.log(`✓ [${event.stepKey}]`);
if (event.type === "step:error") console.error(`✗ [${event.stepKey}]`, event.error);
});
// Возвращаем cleanup — вызовется при orchestrator.destroy()
return () => offLog();
},
};
const orchestrator = new PipelineOrchestrator({
config: {
stages: [ /* ... */ ],
options: {
plugins: [loggingPlugin, analyticsPlugin, sentryPlugin],
},
},
});
// Когда оркестратор больше не нужен:
orchestrator.destroy();
Интерфейс PipelinePlugin:
typescript
type PipelinePlugin = {
name: string;
install(orchestrator: PipelineOrchestrator): void | (() => void);
};
install() получает полный экземпляр оркестратора. Можно делать всё что угодно: подписываться на события, патчить sharedData, вызывать pause() по условию. Если возвращает функцию — она попадает в destroy().
Сами плагины — просто объекты. Никаких базовых классов, никакой магии.
Персистентное состояние
exportState() и importState() уже были — позволяли вручную сохранять и восстанавливать снимок. Но «вручную» — это ключевое слово: нужно было самому решить, когда сохранять, где хранить, когда загружать.
Теперь это автоматизируется через адаптер:
javascript
const localStorageAdapter = {
save: (state) => localStorage.setItem("pipeline:state", JSON.stringify(state)),
load: () => {
const raw = localStorage.getItem("pipeline:state");
return raw ? JSON.parse(raw) : null;
},
};
const orchestrator = new PipelineOrchestrator({
config: {
stages: [ /* ... */ ],
options: { persistAdapter: localStorageAdapter },
},
});
// run() загружает сохранённое состояние в начале
// После каждого успешного шага — сохраняет текущее
await orchestrator.run();
Практический сценарий: длинный многошаговый процесс, пользователь случайно обновил страницу. При следующем открытии pipeline продолжается с того места, где остановился, а не с нуля.
Адаптер может быть асинхронным — полезно для IndexedDB, sessionStorage или серверного хранилища:
typescript
type PipelineStateAdapter = {
save(state: PipelineExportedState): void | Promise<void>;
load(): PipelineExportedState | null | Promise<PipelineExportedState | null>;
};
Если load() вернул null — pipeline стартует с чистого листа. Если save() выбросил ошибку — pipeline не прерывается, ошибка тихо игнорируется. Это намеренное поведение: персистентность не должна ломать основной сценарий.
Stream-шаги: SSE и AsyncIterable
До этой версии все шаги pipeline были одноразовыми: запрос → ответ → следующий шаг. Потоки данных — SSE, WebSocket, длинные стримы — не вписывались в этот паттерн.
Теперь есть StreamStageConfig. Поле stream принимает функцию, возвращающую AsyncIterable<T>. Оркестратор итерирует её и собирает все чанки в массив — это и есть результат шага.
javascript
const orchestrator = createPipeline([
{
key: "auth",
request: async () => await getToken(),
},
{
key: "liveData",
// stream получает те же параметры, что и обычный request
stream: async function* ({ prev, sharedData }) {
// prev — результат шага "auth" (токен)
const source = new EventSource(`/api/events?token=${prev}`);
try {
yield* eventSourceToAsyncIterable(source);
} finally {
source.close();
}
},
// Вызывается для каждого чанка немедленно — не ждёт завершения потока
onChunk: (chunk, sharedData) => {
sharedData.buffer = (sharedData.buffer ?? "") + chunk;
updateUI(sharedData.buffer);
},
},
{
key: "finalize",
request: async ({ allResults }) => {
// allResults.liveData.data — массив всех накопленных чанков
return allResults.liveData.data.join("");
},
},
]);
Несколько важных деталей реализации:
onChunk — это side-effect, не трансформация. Результатом шага всегда будет полный массив чанков. Если нужно агрегировать по-своему — делай это через sharedData в onChunk, потом читай в следующем шаге.
Abort работает корректно. После каждого чанка проверяется сигнал отмены. Если orchestrator.abort() вызван во время стрима — итерация прекращается, шаг завершается с ошибкой.
Stream-шаги поддерживают continueOnError. Если поток упал на середине — можно продолжить pipeline дальше, как с любым другим шагом.
Стандартные события сохраняются. step:start, step:success, step:error — всё работает. Плюс специфические: stream:start, stream:success, stream:error в лог-событиях.
HttpAdapter: fetch вместо axios
Библиотека была жёстко привязана к axios. В edge-окружениях — Cloudflare Workers, Deno, Bun — это создаёт проблемы: axios может быть недоступен или работать некорректно.
Теперь в HttpConfig есть поле adapter:
javascript
// Передаём нативный fetch как адаптер
const fetchAdapter = {
async request(config) {
const url = `${config.baseURL ?? ""}${config.url ?? ""}`;
const res = await fetch(url, {
method: config.method ?? "GET",
body: config.data ? JSON.stringify(config.data) : undefined,
headers: { "Content-Type": "application/json", ...config.headers },
signal: config.signal,
});
const data = await res.json();
return {
data,
status: res.status,
statusText: res.statusText,
headers: Object.fromEntries(res.headers.entries()),
};
},
};
const client = createRestClient({
baseURL: "https://api.example.com",
adapter: fetchAdapter,
// Все остальные фичи работают поверх адаптера без изменений:
auth: { getToken: async () => token },
interceptors: { request: [addCorrelationId] },
sanitizeHeaders: true,
retry: { attempts: 3, delayMs: 500, backoffMultiplier: 2 },
});
Интерфейс намеренно минимальный:
typescript
type HttpAdapter = {
request<T = unknown>(
config: RestRequestConfig & { baseURL?: string }
): Promise<ApiResponse<T>>;
};
Ошибки из адаптера обрабатываются стандартно через toApiError(). 401-retry тоже работает — проверяется поле status на объекте ошибки вместо axios.isAxiosError.
Если adapter не указан — используется axios как раньше. Замена полностью прозрачна для остального кода.
Что ещё добавлено
getStageResults() — синхронный снимок результатов всех шагов. Не нужно подписываться, если нужно просто прочитать текущее состояние в определённый момент:
javascript
const results = orchestrator.getStageResults();
console.log(results.fetchUser?.data);
console.log(results.processData?.status);
orchestrator.destroy() — вызывает cleanup-функции всех установленных плагинов. Вызывай при размонтировании компонента или уничтожении модуля, чтобы не оставлять висящих подписок.
Новые типы
PipelineMetrics— интерфейс дляconfig.metricsPipelinePlugin— контракт плагинаPipelineStateAdapter— контракт адаптера персистентностиStreamStageConfig<T>— тип stream-шага; добавлен в unionPipelineItemHttpAdapter— контракт HTTP-адаптера; добавлен вHttpConfigPipelineLogEventType— union всех имён лог-событий
Обратная совместимость
Все изменения полностью обратно совместимы:
createPipeline()иpipe()— дополнение, не замена конструктораPipelineOrchestrator<TKeys>— generic по умолчаниюstring, старый код не ломается- Метрики, плагины,
persistAdapter— опциональные поля в конфиге, без них поведение не меняется adapterвHttpConfig— если не указан, используется axios- Stream-шаги — новый тип в
PipelineItem, существующий код не затронут
Релиз 1.3.7 уже на npm:
bash
npm i rest-pipeline-js@1.3.7
Репозиторий: github.com/macrulezru/pipeline-js
Документация: всё в README
npm: npmjs.com/package/rest-pipeline-js