All files / src/channel-gateway watch.ts

97.61% Statements 82/84
90.9% Branches 50/55
95% Functions 19/20
100% Lines 67/67

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256                                  3x                     3x                                                                             3x 41x 107x 55x 9x 8x 8x 8x       3x 40x       3x       38x 38x 102x 102x   38x       3x 101x                     3x 20x   18x 17x 17x 17x 17x               15x 15x 15x         15x     8x                                                         5x             3x 29x       29x   38x               23x         16x         7x                 14x 14x 13x 13x 13x 11x           1x         7x 7x       13x 13x 13x 13x   13x   37x 37x       2x 2x   35x 35x 35x   35x 33x   35x   35x 28x   27x        
/**
 * CI watch core for the channel-gateway (issue #483).
 *
 * Pure mechanism, validated end-to-end against a live instance: detect a
 * watchable CI resource by the SHAPE of a tool result, start a poll timer from
 * the moment its id is known, and emit an event on every state-snapshot change
 * until the pipeline reaches a terminal state. Network (poll) and delivery
 * (emit) are injected, so this module is unit-testable without MCP or Channels.
 */
 
/** CI resource kinds the gateway knows how to watch. */
export type CiResourceKind = 'pipeline' | 'job' | 'deployment';
 
/**
 * Statuses that mean "still in flight" — a watch keeps polling while any is
 * present. `blocked` covers a deployment waiting on a manual approval gate.
 */
export const NON_FINAL: ReadonlySet<string> = new Set([
  'created',
  'pending',
  'running',
  'waiting_for_resource',
  'preparing',
  'scheduled',
  'blocked',
]);
 
/** Terminal status sets per resource kind (GitLab CI vocabulary). */
export const TERMINAL: Record<CiResourceKind, ReadonlySet<string>> = {
  pipeline: new Set(['success', 'failed', 'canceled', 'skipped']),
  job: new Set(['success', 'failed', 'canceled', 'skipped', 'manual']),
  deployment: new Set(['success', 'failed', 'canceled', 'skipped']),
};
 
/** A resource to watch: kind + the project and id needed to re-query it. */
export interface WatchTarget {
  kind: CiResourceKind;
  projectId: string;
  id: number;
}
 
/** One job's state as seen during a poll. */
export interface JobState {
  id: number;
  name: string;
  stage: string;
  status: string;
}
 
/** A single transition of one job, included in the emitted event. */
export interface JobTransition {
  name: string;
  from: string | null;
  to: string;
}
 
/** What the gateway pushes to the channel on each change. */
export interface WatchEvent {
  target: WatchTarget;
  /** Aggregate pipeline state derived from the jobs. */
  pipelineState: string;
  jobs: JobState[];
  transitions: JobTransition[];
  terminal: boolean;
}
 
/** Derive a pipeline-level state from its jobs (no separate pipeline GET needed). */
export function aggregateState(jobs: readonly JobState[]): string {
  if (jobs.length === 0) return 'pending';
  const states = new Set(jobs.map((j) => j.status));
  for (const s of states) if (NON_FINAL.has(s)) return 'running';
  if (states.has('failed')) return 'failed';
  Iif (states.has('canceled')) return 'canceled';
  Iif (states.has('skipped') && states.size === 1) return 'skipped';
  return 'success';
}
 
/** True once the aggregate pipeline state can no longer change. */
export function isTerminal(pipelineState: string): boolean {
  return TERMINAL.pipeline.has(pipelineState);
}
 
/** Compute per-job transitions between the previous and current snapshot. */
export function diffJobs(
  prev: ReadonlyMap<string, string>,
  jobs: readonly JobState[],
): JobTransition[] {
  const out: JobTransition[] = [];
  for (const j of jobs) {
    const before = prev.get(j.name) ?? null;
    if (before !== j.status) out.push({ name: j.name, from: before, to: j.status });
  }
  return out;
}
 
/** Build a name -> status map for snapshot comparison. */
export function snapshot(jobs: readonly JobState[]): Map<string, string> {
  return new Map(jobs.map((j) => [j.name, j.status]));
}
 
/**
 * Detect whether a tool result is a watchable CI resource, by shape rather than
 * by a hardcoded tool-name whitelist. Returns a target only when the resource
 * is still non-final (a result already terminal needs no watch).
 *
 * @param projectId - resolved from the originating call's args
 * @param result    - the verbatim downstream tool result (object or array)
 */
export function detectWatchable(projectId: string, result: unknown): WatchTarget | null {
  if (!projectId) return null;
  // A pipeline-shaped object: numeric id + status + pipeline markers (ref/sha/source).
  if (result && typeof result === 'object' && !Array.isArray(result)) {
    const r = result as Record<string, unknown>;
    const id = r.id;
    const status = r.status;
    if (typeof id === 'number' && typeof status === 'string') {
      // Only two kinds are re-queryable from an id: a deployment (polled via
      // its environment's deployments) and a pipeline (polled via its jobs).
      // A bare job is intentionally NOT watchable here — its id is a job id,
      // not a pipeline id, and the poller re-queries by pipeline id, so a job
      // would be polled as if it were a pipeline. Watch the job's pipeline
      // instead. Requiring pipeline markers (ref/sha/source) also keeps a
      // generic `{id, status}` object from arming a false-positive watch.
      const isDeployment = 'environment' in r && 'deployable' in r;
      const isPipeline = 'ref' in r || 'sha' in r || 'source' in r;
      const kind: CiResourceKind | null = isDeployment
        ? 'deployment'
        : isPipeline
          ? 'pipeline'
          : null;
      if (kind && !TERMINAL[kind].has(status)) return { kind, projectId, id };
    }
  }
  return null;
}
 
/** Injected dependencies — real impls hit MCP; tests pass fakes. */
export interface WatchDeps {
  /** Re-query the jobs of a watched pipeline. */
  pollJobs: (target: WatchTarget) => Promise<JobState[]>;
  /** Deliver an event to the channel. */
  emit: (event: WatchEvent) => void;
  /**
   * Sink for a poll failure. A background watch must never reject (it is run
   * detached), so a failed `pollJobs` ends the watch and is reported here
   * instead of becoming an unhandled rejection. Defaults to a no-op.
   */
  onError?: (target: WatchTarget, error: unknown) => void;
  /** Sleep helper (overridable in tests). */
  sleep?: (ms: number) => Promise<void>;
  /** Clock (overridable in tests). */
  now?: () => number;
}
 
/** Tunables for a single watch. */
export interface WatchOptions {
  /** Poll interval in ms (default 10s). */
  pollMs?: number;
  /** Hard cap on total watch duration in ms, guards an endless `running` (default 1h). */
  maxDurationMs?: number;
}
 
const defaultSleep = (ms: number): Promise<void> => new Promise((r) => setTimeout(r, ms));
 
/**
 * Manages active watches, deduplicated by `${projectId}#${kind}#${id}`. Starting
 * a watch that already exists is a no-op (the channel-gateway calls this on every
 * intercepted CI-status result; re-queries must not spawn duplicate timers).
 */
export class WatchManager {
  private readonly active = new Map<string, AbortController>();
  private readonly deps: Required<WatchDeps>;
 
  constructor(deps: WatchDeps) {
    this.deps = {
      sleep: defaultSleep,
      now: () => Date.now(),
      onError: () => {},
      ...deps,
    };
  }
 
  /** Stable dedup key for a target. */
  static key(t: WatchTarget): string {
    return `${t.projectId}#${t.kind}#${t.id}`;
  }
 
  /** Number of currently active watches (for tests/introspection). */
  get size(): number {
    return this.active.size;
  }
 
  /** True if a watch for this target is already running. */
  has(target: WatchTarget): boolean {
    return this.active.has(WatchManager.key(target));
  }
 
  /**
   * Start watching a target. No-op if already watched. Returns a promise that
   * resolves when the watch ends (terminal, cap, or cancellation) — callers may
   * ignore it (fire-and-forget) or await it in tests.
   */
  watch(target: WatchTarget, opts: WatchOptions = {}): Promise<void> {
    const key = WatchManager.key(target);
    if (this.active.has(key)) return Promise.resolve();
    const ctrl = new AbortController();
    this.active.set(key, ctrl);
    return this.run(target, opts, ctrl.signal).finally(() => {
      this.active.delete(key);
    });
  }
 
  /** Cancel a specific watch (e.g. on session end for one resource). */
  cancel(target: WatchTarget): void {
    this.active.get(WatchManager.key(target))?.abort();
  }
 
  /** Cancel every active watch (session end / shutdown). */
  cancelAll(): void {
    for (const ctrl of this.active.values()) ctrl.abort();
    this.active.clear();
  }
 
  private async run(target: WatchTarget, opts: WatchOptions, signal: AbortSignal): Promise<void> {
    const pollMs = opts.pollMs ?? 10_000;
    const maxDurationMs = opts.maxDurationMs ?? 3_600_000;
    const started = this.deps.now();
    let prev = new Map<string, string>();
 
    while (!signal.aborted) {
      let jobs: JobState[];
      try {
        jobs = await this.deps.pollJobs(target);
      } catch (error) {
        // A poll failure (downstream lost, transient API error) must not crash
        // the process via an unhandled rejection — report it and end the watch.
        this.deps.onError(target, error);
        return;
      }
      const pipelineState = aggregateState(jobs);
      const transitions = diffJobs(prev, jobs);
      const terminal = isTerminal(pipelineState);
 
      if (transitions.length > 0 || terminal) {
        this.deps.emit({ target, pipelineState, jobs, transitions, terminal });
      }
      prev = snapshot(jobs);
 
      if (terminal) return;
      if (this.deps.now() - started >= maxDurationMs) return;
 
      await this.deps.sleep(pollMs);
    }
  }
}