All files / src/channel-gateway interceptor.ts

96.77% Statements 30/31
100% Branches 18/18
90% Functions 9/10
96.42% Lines 27/28

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115              2x             2x                             2x 14x 13x 13x 3x   3x     2x         22x 22x 22x 22x 9x     1x                 11x                 9x 2x           2x   7x         6x                 13x 10x 10x         6x               10x         7x      
/**
 * Gateway interceptor (issue #483): the single forward-path hook. Every tool
 * call is forwarded verbatim to the downstream gitlab-mcp; when the result is a
 * non-final CI resource, a watch is registered (deduplicated) that polls jobs
 * and emits channel events. Kept free of MCP wiring so it is unit-testable with
 * a fake `forward` — the MCP adapter (gateway.ts) supplies the real transport.
 */
import {
  detectWatchable,
  WatchManager,
  type JobState,
  type WatchEvent,
  type WatchTarget,
} from './watch';
import { parseDeployments, parseJobs, parseToolResult } from './format';
 
/** Injected I/O: downstream forwarding and channel delivery. */
export interface InterceptorDeps {
  /** Forward a tool call to the downstream gitlab-mcp and return its raw result. */
  forward: (name: string, args: unknown) => Promise<unknown>;
  /** Deliver a watch event to the channel. */
  emit: (event: WatchEvent) => void;
  /** Poll interval for watches in ms (default 10s). */
  pollMs?: number;
  /** Hard cap per watch in ms (default 1h). */
  maxDurationMs?: number;
}
 
/** Pull the project identifier out of a tool call's args (path or numeric id). */
export function extractProjectId(args: unknown): string {
  if (args && typeof args === 'object') {
    const p = (args as Record<string, unknown>).project_id;
    if (typeof p === 'string') return p;
    if (typeof p === 'number') return String(p);
  }
  return '';
}
 
export class Interceptor {
  private readonly watches: WatchManager;
  private readonly pollMs: number;
  private readonly maxDurationMs: number;
 
  constructor(private readonly deps: InterceptorDeps) {
    this.pollMs = deps.pollMs ?? 10_000;
    this.maxDurationMs = deps.maxDurationMs ?? 3_600_000;
    this.watches = new WatchManager({
      pollJobs: (t) => this.pollJobs(t),
      emit: deps.emit,
      onError: (t, error) => {
        process.stderr.write(
          `[channel-gateway] watch ${WatchManager.key(t)} stopped on poll error: ${String(error)}\n`,
        );
      },
    });
  }
 
  /** Number of active watches (introspection/tests). */
  get activeWatches(): number {
    return this.watches.size;
  }
 
  /**
   * Re-query a watched resource through the same downstream path, normalized to
   * JobState[]. Pipelines poll their jobs; deployments poll the deployment list
   * and project the watched id into a single pseudo-job.
   */
  private async pollJobs(target: WatchTarget): Promise<JobState[]> {
    if (target.kind === 'deployment') {
      const result = await this.deps.forward('browse_environments', {
        action: 'list_deployments',
        project_id: target.projectId,
        order_by: 'id',
        sort: 'desc',
      });
      return parseDeployments(result).filter((d) => d.id === target.id);
    }
    const result = await this.deps.forward('browse_pipelines', {
      action: 'jobs',
      project_id: target.projectId,
      pipeline_id: target.id,
    });
    return parseJobs(result);
  }
 
  /**
   * Forward one tool call, then arm a watch if the result is a non-final CI
   * resource. The watch runs fire-and-forget; the original result is returned
   * unchanged so the agent sees exactly what the downstream produced.
   */
  async handleCall(name: string, args: unknown): Promise<unknown> {
    const result = await this.deps.forward(name, args);
    const target = detectWatchable(extractProjectId(args), parseToolResult(result));
    if (target && !this.watches.has(target)) {
      // Detached: the watch outlives this call. Poll failures are already drained
      // through onError inside the watch; this .catch is the last-resort net for
      // anything else (e.g. an emit throwing) so a background watch can never
      // surface as an unhandled rejection and crash the gateway.
      this.watches
        .watch(target, { pollMs: this.pollMs, maxDurationMs: this.maxDurationMs })
        .catch((error: unknown) => {
          process.stderr.write(
            `[channel-gateway] watch ${WatchManager.key(target)} failed: ${String(error)}\n`,
          );
        });
    }
    return result;
  }
 
  /** Cancel all watches (session end / shutdown). */
  shutdown(): void {
    this.watches.cancelAll();
  }
}