All files / src/channel-gateway forwarding.ts

100% Statements 11/11
100% Branches 7/7
100% Functions 2/2
100% Lines 11/11

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61                                                                      2x           16x 7x   11x 11x     3x 2x 2x   1x         2x 8x    
/**
 * Forward policy for the channel-gateway downstream link (issue #483, Phase 2).
 *
 * Pure control flow, kept out of the MCP adapter so it is unit-testable:
 *
 *  - A call that arrives while the link is down is BUFFERED (not failed): it was
 *    never sent, so replaying it after reconnect cannot double-execute. The
 *    buffer is bounded — once `maxQueued` calls are already waiting, further
 *    calls are rejected (backpressure) instead of growing without limit.
 *  - A call that was SENT and then failed is retried once after reconnect only
 *    if it is a read (idempotent). Writes are never blind-retried: the request
 *    may have executed before the link dropped, and a retry would double it.
 */
 
/** Injected hooks; the gateway supplies real transport, tests supply fakes. */
export interface ForwardPolicy {
  /** True if `name` is an idempotent read (safe to retry). */
  isRead: (name: string) => boolean;
  /** Current downstream link health. */
  isConnected: () => boolean;
  /**
   * Wait for the link to come back, bounded. Resolves once connected; rejects on
   * timeout or when the bounded buffer is full. The gateway counts in-flight
   * waiters here to enforce `maxQueued`.
   */
  waitForConnection: () => Promise<void>;
  /** Send one call over the (assumed live) downstream link. */
  call: (name: string, args: unknown) => Promise<unknown>;
}
 
/**
 * Forward one call under the read-safe / write-no-retry policy with bounded
 * buffering across a reconnect. Throws if the buffer is full (via
 * `waitForConnection`) or a non-retryable call fails.
 */
export async function forwardWithPolicy(
  policy: ForwardPolicy,
  name: string,
  args: unknown,
): Promise<unknown> {
  // Not yet sent: safe to buffer until the link returns, for reads and writes.
  if (!policy.isConnected()) {
    await policy.waitForConnection();
  }
  try {
    return await policy.call(name, args);
  } catch (err) {
    // Sent but failed: only idempotent reads may be replayed.
    if (policy.isRead(name)) {
      await policy.waitForConnection();
      return await policy.call(name, args);
    }
    throw err;
  }
}
 
/** A tool call is an idempotent read by name prefix. */
export function isReadCall(name: string): boolean {
  return name.startsWith('browse_') || name.startsWith('get_') || name.startsWith('list_');
}