Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 15x 15x 15x 15x 15x 15x 15x 15x 7x 2x 15x 11x 11x 5x 5x 5x 29x 15x 3x 3x 2x 2x 2x 15x 5x 5x 13x 13x 13x 15x 14x 14x 14x 14x 14x 12x 1x 1x 11x 11x 11x 2x 2x 2x 2x 2x 2x 2x 2x 2x 5x 5x 5x 2x 3x 3x 3x 3x 2x 3x 2x 3x 7x 7x 3x 4x 4x 2x 2x 4x 4x 4x | /**
* Channel-gateway MCP adapter (issue #483).
*
* One process that is BOTH:
* - a downstream MCP client to gitlab-mcp (the real tool catalog), and
* - an upstream channel-protocol MCP server to Claude Code (re-exposes that
* catalog verbatim AND pushes CI events into the session).
*
* The forward-path hook (detect non-final CI result -> arm a watch) lives in
* {@link Interceptor}; this file is the transport wiring: connect/reconnect with
* backoff, read-safe / write-no-retry forwarding, and channel delivery.
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
type CallToolResult,
} from '@modelcontextprotocol/sdk/types.js';
import { Interceptor } from './interceptor';
import { formatEvent } from './format';
import { forwardWithPolicy, isReadCall } from './forwarding';
import type { WatchEvent } from './watch';
/** Claude Code channel push method (Channels research preview). */
const CHANNEL_NOTIFICATION = 'notifications/claude/channel';
export interface GatewayConfig {
/** Executable to launch the downstream gitlab-mcp (e.g. `node`). */
downstreamCommand: string;
/** Args for the downstream (e.g. `['dist/src/main.js', 'stdio']`). */
downstreamArgs: string[];
/** Environment for the downstream process (token, API url, gates). */
downstreamEnv?: Record<string, string>;
/** Watch poll interval in ms (default 10s). */
pollMs?: number;
/** Max backoff between reconnect attempts in ms (default 30s). */
maxBackoffMs?: number;
/** Max calls buffered while the downstream link is down (default 100). */
maxQueued?: number;
/** How long a buffered call waits for reconnect before failing (default 30s). */
connectTimeoutMs?: number;
/** Gateway server name (becomes the channel `source` attribute). */
name?: string;
/** Gateway server version. */
version?: string;
}
export class ChannelGateway {
private readonly server: Server;
private client: Client;
private transport?: StdioClientTransport;
private readonly interceptor: Interceptor;
private connected = false;
private reconnecting = false;
private closing = false;
private pendingWaiters = 0;
constructor(private readonly config: GatewayConfig) {
this.server = new Server(
{ name: config.name ?? 'gitlab-ci-gateway', version: config.version ?? '0.1.0' },
{
capabilities: {
tools: {}, // re-exposes the downstream catalog
experimental: { 'claude/channel': {} }, // allowed to push channel events
},
instructions:
'Forwards the full gitlab-mcp tool catalog. When a CI pipeline/job is ' +
'still running, it is watched in the background and a <channel> event ' +
'is pushed on each state change and on completion.',
},
);
this.client = this.newClient();
this.interceptor = new Interceptor({
forward: (name, args) => this.forward(name, args),
emit: (event) => this.emit(event),
pollMs: config.pollMs,
});
this.registerHandlers();
}
/** Connect downstream, then serve the channel protocol over stdio. */
async start(): Promise<void> {
await this.connectDownstream();
await this.server.connect(new StdioServerTransport());
}
/** Stop watches and tear down the downstream link. */
async stop(): Promise<void> {
this.closing = true;
this.interceptor.shutdown();
await this.transport?.close().catch(() => {});
}
private newClient(): Client {
return new Client({ name: this.config.name ?? 'gitlab-ci-gateway', version: '0.1.0' });
}
private registerHandlers(): void {
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
// The catalog read must honour the same reconnect/buffer policy as
// CallTool: a ListTools that lands mid-reconnect should wait for the
// link (bounded) and replay once, not throw against a dead client.
const { tools } = (await forwardWithPolicy(
{
isRead: () => true, // listing the catalog is an idempotent read
isConnected: () => this.connected,
waitForConnection: () => this.waitForConnection(),
call: () => this.client.listTools(),
},
'tools/list',
undefined,
)) as Awaited<ReturnType<Client['listTools']>>;
return { tools };
});
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
return (await this.interceptor.handleCall(name, args ?? {})) as CallToolResult;
});
}
/** Establish the downstream session, retrying with exponential backoff. */
private async connectDownstream(): Promise<void> {
const maxBackoff = this.config.maxBackoffMs ?? 30_000;
let backoff = 500;
for (;;) {
// stop() may have landed while we were backing off: don't open a new link.
if (this.closing) return;
try {
this.transport = new StdioClientTransport({
command: this.config.downstreamCommand,
args: this.config.downstreamArgs,
env: this.config.downstreamEnv,
});
this.transport.onclose = (): void => this.handleDownstreamClose();
this.client = this.newClient();
await this.client.connect(this.transport);
if (this.closing) {
// stop() arrived during the connect handshake: tear the just-opened
// link back down instead of reviving a session the caller closed.
await this.transport.close().catch(() => {});
return;
}
this.connected = true;
// Only on a reconnect (not the initial connect): announce link restored.
if (this.reconnecting) this.notifyLink('restored');
return;
} catch {
Iif (this.closing) return;
await this.sleep(backoff);
backoff = Math.min(backoff * 2, maxBackoff);
}
}
}
private handleDownstreamClose(): void {
this.connected = false;
Iif (this.closing || this.reconnecting) return;
this.reconnecting = true;
// Link health as a signal: tell the session the downstream dropped so the
// agent knows in-flight reads will retry and writes will surface errors.
this.notifyLink('lost');
void this.connectDownstream().finally(() => {
this.reconnecting = false;
});
}
/**
* Await a live downstream link, bounded. Enforces `maxQueued` (backpressure)
* and a connect timeout so a permanently-down downstream fails calls instead
* of hanging or buffering without limit.
*/
private async waitForConnection(): Promise<void> {
Iif (this.connected) return;
const maxQueued = this.config.maxQueued ?? 100;
if (this.pendingWaiters >= maxQueued) {
throw new Error(`downstream unavailable: request buffer full (${maxQueued})`);
}
this.pendingWaiters++;
try {
const deadline = Date.now() + (this.config.connectTimeoutMs ?? 30_000);
while (!this.connected && !this.closing && Date.now() < deadline) {
await this.sleep(100);
}
if (!this.connected) {
throw new Error('downstream unavailable: reconnect timed out');
}
} finally {
this.pendingWaiters--;
}
}
/** Forward one call under the read-safe / write-no-retry + bounded-buffer policy. */
private forward(name: string, args: unknown): Promise<unknown> {
return forwardWithPolicy(
{
isRead: isReadCall,
isConnected: () => this.connected,
waitForConnection: () => this.waitForConnection(),
call: (n, a) => this.callDownstream(n, a),
},
name,
args,
);
}
private async callDownstream(name: string, args: unknown): Promise<unknown> {
return await this.client.callTool({
name,
arguments: (args ?? {}) as Record<string, unknown>,
});
}
/** Push a watch event into the running session as a <channel> event. */
private emit(event: WatchEvent): void {
const { content, meta } = formatEvent(event);
// Custom Channels method, carried by the SDK's open notification shape.
void this.server.notification({ method: CHANNEL_NOTIFICATION, params: { content, meta } });
}
/** Push a downstream link-health change into the session as a <channel> event. */
private notifyLink(state: 'lost' | 'restored'): void {
const content =
state === 'lost'
? 'gitlab-mcp link lost - reconnecting; reads will retry, writes will surface errors'
: 'gitlab-mcp link restored';
void this.server.notification({
method: CHANNEL_NOTIFICATION,
params: { content, meta: { kind: 'link', state } },
});
}
private sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
}
|