All files / src/channel-gateway gateway.ts

95.55% Statements 86/90
82.05% Branches 32/39
89.65% Functions 26/29
98.79% Lines 82/83

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239                        1x 1x 1x 1x 1x         1x 1x 1x       1x                                             1x         15x 15x 15x 15x   15x 15x                         15x 15x 7x 2x     15x         11x 11x         5x 5x 5x       29x       15x       3x     3x 2x 2x         2x   15x 5x 5x           13x 13x 13x   15x 14x 14x         14x 14x 14x 12x     1x 1x   11x   11x 11x   2x 2x 2x           2x 2x 2x     2x 2x 2x                   5x 5x 5x 2x   3x 3x 3x 3x 2x   3x 2x     3x           7x     7x 3x 4x               4x               2x   2x           4x     4x             4x      
/**
 * Channel-gateway MCP adapter (issue #483).
 *
 * One process that is BOTH:
 *   - a downstream MCP client to gitlab-mcp (the real tool catalog), and
 *   - an upstream channel-protocol MCP server to Claude Code (re-exposes that
 *     catalog verbatim AND pushes CI events into the session).
 *
 * The forward-path hook (detect non-final CI result -> arm a watch) lives in
 * {@link Interceptor}; this file is the transport wiring: connect/reconnect with
 * backoff, read-safe / write-no-retry forwarding, and channel delivery.
 */
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  type CallToolResult,
} from '@modelcontextprotocol/sdk/types.js';
import { Interceptor } from './interceptor';
import { formatEvent } from './format';
import { forwardWithPolicy, isReadCall } from './forwarding';
import type { WatchEvent } from './watch';
 
/** Claude Code channel push method (Channels research preview). */
const CHANNEL_NOTIFICATION = 'notifications/claude/channel';
 
export interface GatewayConfig {
  /** Executable to launch the downstream gitlab-mcp (e.g. `node`). */
  downstreamCommand: string;
  /** Args for the downstream (e.g. `['dist/src/main.js', 'stdio']`). */
  downstreamArgs: string[];
  /** Environment for the downstream process (token, API url, gates). */
  downstreamEnv?: Record<string, string>;
  /** Watch poll interval in ms (default 10s). */
  pollMs?: number;
  /** Max backoff between reconnect attempts in ms (default 30s). */
  maxBackoffMs?: number;
  /** Max calls buffered while the downstream link is down (default 100). */
  maxQueued?: number;
  /** How long a buffered call waits for reconnect before failing (default 30s). */
  connectTimeoutMs?: number;
  /** Gateway server name (becomes the channel `source` attribute). */
  name?: string;
  /** Gateway server version. */
  version?: string;
}
 
export class ChannelGateway {
  private readonly server: Server;
  private client: Client;
  private transport?: StdioClientTransport;
  private readonly interceptor: Interceptor;
  private connected = false;
  private reconnecting = false;
  private closing = false;
  private pendingWaiters = 0;
 
  constructor(private readonly config: GatewayConfig) {
    this.server = new Server(
      { name: config.name ?? 'gitlab-ci-gateway', version: config.version ?? '0.1.0' },
      {
        capabilities: {
          tools: {}, // re-exposes the downstream catalog
          experimental: { 'claude/channel': {} }, // allowed to push channel events
        },
        instructions:
          'Forwards the full gitlab-mcp tool catalog. When a CI pipeline/job is ' +
          'still running, it is watched in the background and a <channel> event ' +
          'is pushed on each state change and on completion.',
      },
    );
    this.client = this.newClient();
    this.interceptor = new Interceptor({
      forward: (name, args) => this.forward(name, args),
      emit: (event) => this.emit(event),
      pollMs: config.pollMs,
    });
    this.registerHandlers();
  }
 
  /** Connect downstream, then serve the channel protocol over stdio. */
  async start(): Promise<void> {
    await this.connectDownstream();
    await this.server.connect(new StdioServerTransport());
  }
 
  /** Stop watches and tear down the downstream link. */
  async stop(): Promise<void> {
    this.closing = true;
    this.interceptor.shutdown();
    await this.transport?.close().catch(() => {});
  }
 
  private newClient(): Client {
    return new Client({ name: this.config.name ?? 'gitlab-ci-gateway', version: '0.1.0' });
  }
 
  private registerHandlers(): void {
    this.server.setRequestHandler(ListToolsRequestSchema, async () => {
      // The catalog read must honour the same reconnect/buffer policy as
      // CallTool: a ListTools that lands mid-reconnect should wait for the
      // link (bounded) and replay once, not throw against a dead client.
      const { tools } = (await forwardWithPolicy(
        {
          isRead: () => true, // listing the catalog is an idempotent read
          isConnected: () => this.connected,
          waitForConnection: () => this.waitForConnection(),
          call: () => this.client.listTools(),
        },
        'tools/list',
        undefined,
      )) as Awaited<ReturnType<Client['listTools']>>;
      return { tools };
    });
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      const { name, arguments: args } = request.params;
      return (await this.interceptor.handleCall(name, args ?? {})) as CallToolResult;
    });
  }
 
  /** Establish the downstream session, retrying with exponential backoff. */
  private async connectDownstream(): Promise<void> {
    const maxBackoff = this.config.maxBackoffMs ?? 30_000;
    let backoff = 500;
    for (;;) {
      // stop() may have landed while we were backing off: don't open a new link.
      if (this.closing) return;
      try {
        this.transport = new StdioClientTransport({
          command: this.config.downstreamCommand,
          args: this.config.downstreamArgs,
          env: this.config.downstreamEnv,
        });
        this.transport.onclose = (): void => this.handleDownstreamClose();
        this.client = this.newClient();
        await this.client.connect(this.transport);
        if (this.closing) {
          // stop() arrived during the connect handshake: tear the just-opened
          // link back down instead of reviving a session the caller closed.
          await this.transport.close().catch(() => {});
          return;
        }
        this.connected = true;
        // Only on a reconnect (not the initial connect): announce link restored.
        if (this.reconnecting) this.notifyLink('restored');
        return;
      } catch {
        Iif (this.closing) return;
        await this.sleep(backoff);
        backoff = Math.min(backoff * 2, maxBackoff);
      }
    }
  }
 
  private handleDownstreamClose(): void {
    this.connected = false;
    Iif (this.closing || this.reconnecting) return;
    this.reconnecting = true;
    // Link health as a signal: tell the session the downstream dropped so the
    // agent knows in-flight reads will retry and writes will surface errors.
    this.notifyLink('lost');
    void this.connectDownstream().finally(() => {
      this.reconnecting = false;
    });
  }
 
  /**
   * Await a live downstream link, bounded. Enforces `maxQueued` (backpressure)
   * and a connect timeout so a permanently-down downstream fails calls instead
   * of hanging or buffering without limit.
   */
  private async waitForConnection(): Promise<void> {
    Iif (this.connected) return;
    const maxQueued = this.config.maxQueued ?? 100;
    if (this.pendingWaiters >= maxQueued) {
      throw new Error(`downstream unavailable: request buffer full (${maxQueued})`);
    }
    this.pendingWaiters++;
    try {
      const deadline = Date.now() + (this.config.connectTimeoutMs ?? 30_000);
      while (!this.connected && !this.closing && Date.now() < deadline) {
        await this.sleep(100);
      }
      if (!this.connected) {
        throw new Error('downstream unavailable: reconnect timed out');
      }
    } finally {
      this.pendingWaiters--;
    }
  }
 
  /** Forward one call under the read-safe / write-no-retry + bounded-buffer policy. */
  private forward(name: string, args: unknown): Promise<unknown> {
    return forwardWithPolicy(
      {
        isRead: isReadCall,
        isConnected: () => this.connected,
        waitForConnection: () => this.waitForConnection(),
        call: (n, a) => this.callDownstream(n, a),
      },
      name,
      args,
    );
  }
 
  private async callDownstream(name: string, args: unknown): Promise<unknown> {
    return await this.client.callTool({
      name,
      arguments: (args ?? {}) as Record<string, unknown>,
    });
  }
 
  /** Push a watch event into the running session as a <channel> event. */
  private emit(event: WatchEvent): void {
    const { content, meta } = formatEvent(event);
    // Custom Channels method, carried by the SDK's open notification shape.
    void this.server.notification({ method: CHANNEL_NOTIFICATION, params: { content, meta } });
  }
 
  /** Push a downstream link-health change into the session as a <channel> event. */
  private notifyLink(state: 'lost' | 'restored'): void {
    const content =
      state === 'lost'
        ? 'gitlab-mcp link lost - reconnecting; reads will retry, writes will surface errors'
        : 'gitlab-mcp link restored';
    void this.server.notification({
      method: CHANNEL_NOTIFICATION,
      params: { content, meta: { kind: 'link', state } },
    });
  }
 
  private sleep(ms: number): Promise<void> {
    return new Promise((r) => setTimeout(r, ms));
  }
}