From ba88fa50f9d38f5f92fcea0a74e960ad1edc55bc Mon Sep 17 00:00:00 2001 From: pi-bot-01 Date: Sat, 14 Mar 2026 13:47:40 -0700 Subject: [PATCH] feat: multi-session webhook claim system with EADDRINUSE handling - Server gracefully handles EADDRINUSE (logs notice, continues without server) - POST /claim endpoint with referral-based async handoff: 202 (pending) -> poll with referral_id -> 200/401/408 - GET /claim shows current owner and pending queue - DOS protection: 503 when MAX_PENDING_CLAIMS (10) reached - Claims expire after 5 minutes - Pi commands: /webhook:status, /webhook:claim, /webhook:release --- pi-extension/index.ts | 176 +++++++++++++++++++++-- pi-extension/webhook/server.ts | 252 ++++++++++++++++++++++++++++++++- 2 files changed, 413 insertions(+), 15 deletions(-) diff --git a/pi-extension/index.ts b/pi-extension/index.ts index 5f0a122..174927e 100644 --- a/pi-extension/index.ts +++ b/pi-extension/index.ts @@ -3,6 +3,7 @@ * * Registers Gitea tools (read + write) and optional webhook server. * Supports @mention routing for multi-bot coordination. + * Supports multi-session claim system for webhook ownership. */ import registerReadTools from "./tools/read-tools.js"; @@ -11,6 +12,8 @@ import { startWebhookServer, stopWebhookServer, startPolling, stopPolling, setSendMessage, getTrackedRepos, setRepoConfig, + setOnClaimRequested, approvePendingClaim, getClaimStatus, + SESSION_ID, WEBHOOK_PORT, WEBHOOK_HOST, } from "./webhook/server.js"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; @@ -42,7 +45,7 @@ export default function (pi: ExtensionAPI) { } const config = setRepoConfig(params.repo, { respondTo: mode }); return { - content: [{ type: "text", text: `✅ ${params.repo}: respondTo = ${config.respondTo}` }], + content: [{ type: "text", text: `${params.repo}: respondTo = ${config.respondTo}` }], details: { repo: params.repo, config }, }; }, @@ -59,11 +62,11 @@ export default function (pi: ExtensionAPI) { for (const [name] of webhook) { const mode = configs.get(name)?.respondTo ?? "all"; - lines.push(`✅ ${name} — webhook (respondTo: ${mode})`); + lines.push(`${name} — webhook (respondTo: ${mode})`); } for (const name of collab) { const mode = configs.get(name)?.respondTo ?? "mention"; - lines.push(`📋 ${name} — collab/notification (respondTo: ${mode})`); + lines.push(`${name} — collab/notification (respondTo: ${mode})`); } return { @@ -73,6 +76,157 @@ export default function (pi: ExtensionAPI) { }, }); + // ── Webhook claim commands ─────────────────────────────────────────────── + + pi.registerCommand("webhook:status", { + description: "Show webhook server status and claim ownership", + handler: async (_args, _ctx) => { + const status = getClaimStatus(); + + if (status.isOwner && status.pending.length === 0) { + console.log(`Webhook: you own the server (session: ${status.sessionId})`); + console.log(` port: ${WEBHOOK_PORT}, cwd: ${status.ownerCwd}`); + } else if (status.isOwner) { + console.log(`Webhook: you own the server (session: ${status.sessionId})`); + console.log(` port: ${WEBHOOK_PORT}, cwd: ${status.ownerCwd}`); + console.log(` ${status.pending.length} pending claim(s):`); + for (const c of status.pending) { + const age = Math.round((Date.now() - c.createdAt) / 1000); + console.log(` ${c.referralId} from ${c.sessionId} (${c.cwd}) — ${age}s ago`); + } + console.log(` Use /webhook:release to hand off to next in queue`); + } else { + // Not the server owner — query the server + try { + const res = await fetch(`http://127.0.0.1:${WEBHOOK_PORT}/claim`); + if (res.ok) { + const data = await res.json() as Record; + console.log(`Webhook: owned by another session`); + console.log(` owner: ${data.owner} (cwd: ${data.owner_cwd})`); + console.log(` server session: ${data.server_session}`); + const pending = data.pending as Array>; + if (pending?.length) { + console.log(` ${pending.length} pending claim(s)`); + } + console.log(` Use /webhook:claim to request ownership`); + } else { + console.log(`Webhook: no server running on port ${WEBHOOK_PORT}`); + } + } catch { + console.log(`Webhook: no server reachable on port ${WEBHOOK_PORT}`); + } + } + }, + }); + + pi.registerCommand("webhook:claim", { + description: "Request webhook ownership from the current server owner", + handler: async (_args, ctx) => { + const status = getClaimStatus(); + + if (status.isOwner) { + console.log("You already own the webhook server."); + return; + } + + // POST to the running server's /claim endpoint + const url = `http://127.0.0.1:${WEBHOOK_PORT}/claim`; + let referralId: string | undefined; + + try { + // Initial request + const initRes = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ session_id: SESSION_ID, cwd: process.cwd() }), + }); + + if (initRes.status === 503) { + console.log("Server has too many pending claims. Try again later."); + return; + } + + const initData = await initRes.json() as Record; + + if (initRes.status === 200) { + console.log("Claimed immediately (no contention)."); + return; + } + + if (initRes.status !== 202) { + console.log(`Unexpected response: ${initRes.status} ${JSON.stringify(initData)}`); + return; + } + + referralId = initData.referral_id as string; + console.log(`Claim submitted (referral: ${referralId})`); + console.log(` Current owner: ${initData.owner} (${initData.owner_cwd})`); + console.log(` Queue position: ${initData.position}`); + console.log(` Waiting for owner to /webhook:release ...`); + + // Poll every 3s for up to 5 minutes + const deadline = Date.now() + 5 * 60 * 1000; + while (Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 3000)); + + const pollRes = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ session_id: SESSION_ID, referral_id: referralId }), + }); + + if (pollRes.status === 200) { + console.log("Claim approved — you now own the webhook server."); + return; + } + if (pollRes.status === 401) { + console.log("Claim denied by the current owner."); + return; + } + if (pollRes.status === 408) { + console.log("Claim expired (timed out)."); + return; + } + // 202 = still pending, continue polling + } + + console.log("Claim timed out (client-side deadline)."); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("ECONNREFUSED")) { + console.log(`No webhook server running on port ${WEBHOOK_PORT}.`); + } else { + console.log(`Claim failed: ${msg}`); + } + } + }, + }); + + pi.registerCommand("webhook:release", { + description: "Release webhook ownership to the next pending claimer", + handler: async (_args, _ctx) => { + const status = getClaimStatus(); + + if (!status.isOwner) { + console.log("You don't own the webhook server — nothing to release."); + return; + } + + if (status.pending.length === 0) { + console.log("No pending claims to hand off to."); + return; + } + + const { approved, newOwner } = approvePendingClaim(); + if (approved) { + console.log(`Ownership transferred to ${approved.sessionId} (${approved.cwd})`); + console.log(`New owner session: ${newOwner}`); + } else { + console.log("No pending claims (they may have expired)."); + } + }, + }); + // ── Lifecycle ──────────────────────────────────────────────────────────── // GITEA_ENABLE_POLLING=1 opts in to running the webhook server + notification poller. @@ -82,7 +236,13 @@ export default function (pi: ExtensionAPI) { (!process.env.GITEA_HOOKS_URL && !process.env.OPENCLAW_HOOKS_URL); pi.on("session_start", async (_event, ctx) => { - console.log("[pi-gitea] Session started"); + console.log(`[pi-gitea] Session started (session: ${SESSION_ID})`); + + // Set up claim notification — alert the user when another session wants ownership + setOnClaimRequested((claim) => { + console.log(`[pi-gitea] Claim request from ${claim.sessionId} (${claim.cwd})`); + console.log(`[pi-gitea] Use /webhook:release to hand off, or ignore to let it expire`); + }); // Auto-detect runtime: pi-bot (persistent session) vs openclaw (hooks endpoint) if (ctx.sendUserMessage) { @@ -115,12 +275,8 @@ export default function (pi: ExtensionAPI) { } if (enablePolling) { - try { - await startWebhookServer(pi); - await startPolling(pi); - } catch (err) { - console.error("[pi-gitea] Failed to start webhook/polling:", err); - } + await startWebhookServer(pi); + await startPolling(pi); } else { console.log("[pi-gitea] Webhook server + polling disabled (openclaw mode — tools only)"); } diff --git a/pi-extension/webhook/server.ts b/pi-extension/webhook/server.ts index 5d535b2..cf94226 100644 --- a/pi-extension/webhook/server.ts +++ b/pi-extension/webhook/server.ts @@ -32,6 +32,7 @@ const STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json"; // ── State ──────────────────────────────────────────────────────────────────── let server: Server | null = null; +let serverOwned = false; // true if THIS process bound the port let processingQueue: Array<{ prompt: string; timestamp: number }> = []; const maxQueueDepth = 50; let isProcessing = false; @@ -39,6 +40,189 @@ let repoPollTimer: NodeJS.Timeout | null = null; let notifPollTimer: NodeJS.Timeout | null = null; let sendMessage: ((message: string) => Promise) | null = null; +// ── Claim system ───────────────────────────────────────────────────────────── +// Multiple pi sessions may start concurrently. Only one can bind the webhook +// port. The instance that binds it becomes the initial "claim owner" — webhook +// events are delivered to its sendMessage callback. +// +// Other sessions can request ownership via POST /claim: +// +// 1. Client sends POST /claim { session_id, cwd, referral_id? } +// 2. Server responds 202 { referral_id, position, owner } — queued +// 3. Client polls POST /claim { session_id, referral_id } +// 4. Server responds: +// 202 — still pending (current owner hasn't released) +// 200 — claimed (you are the new owner) +// 401 — denied (owner explicitly rejected — reserved for future use) +// 408 — timed out (claim expired) +// 5. GET /claim — returns current owner info + pending queue +// +// The current owner releases via the /webhook:release pi command, which calls +// approvePendingClaim() in-process (no HTTP needed — it's the server process). +// +// DOS protection: max pending referrals; 503 for new clients when full. + +/** Unique id for this pi process */ +const SESSION_ID = `pi-${process.pid}-${Date.now().toString(36)}`; + +/** Session id of whoever currently receives events */ +let claimOwner: string = SESSION_ID; +/** CWD of current owner (for display) */ +let claimOwnerCwd: string = process.cwd(); + +interface ClaimRequest { + referralId: string; + sessionId: string; + cwd: string; + status: "pending" | "approved" | "denied" | "expired"; + createdAt: number; +} + +const MAX_PENDING_CLAIMS = 10; +const CLAIM_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes +let pendingClaims: Map = new Map(); + +/** Notify callback — set by the extension to alert the current owner */ +let onClaimRequested: ((claim: ClaimRequest) => void) | null = null; + +export function setOnClaimRequested(fn: (claim: ClaimRequest) => void) { + onClaimRequested = fn; +} + +function generateReferralId(): string { + return `ref-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; +} + +function purgeExpiredClaims() { + const now = Date.now(); + for (const [id, claim] of pendingClaims) { + if (now - claim.createdAt > CLAIM_TIMEOUT_MS) { + claim.status = "expired"; + pendingClaims.delete(id); + } + } +} + +/** + * Called by the server owner (via /webhook:release command) to approve the + * next pending claim. Transfers ownership to the oldest pending request. + */ +export function approvePendingClaim(): { approved: ClaimRequest | null; newOwner: string } { + purgeExpiredClaims(); + + // Find oldest pending + let oldest: ClaimRequest | null = null; + for (const claim of pendingClaims.values()) { + if (claim.status === "pending") { + if (!oldest || claim.createdAt < oldest.createdAt) { + oldest = claim; + } + } + } + + if (!oldest) { + return { approved: null, newOwner: claimOwner }; + } + + oldest.status = "approved"; + claimOwner = oldest.sessionId; + claimOwnerCwd = oldest.cwd; + console.log(`[gitea-claim] Ownership transferred to ${oldest.sessionId} (${oldest.cwd})`); + + return { approved: oldest, newOwner: claimOwner }; +} + +/** Get current claim status for display */ +export function getClaimStatus() { + purgeExpiredClaims(); + return { + owner: claimOwner, + ownerCwd: claimOwnerCwd, + sessionId: SESSION_ID, + isOwner: claimOwner === SESSION_ID, + pending: [...pendingClaims.values()].filter((c) => c.status === "pending"), + }; +} + +/** Handle POST /claim from another pi instance */ +function handleClaimRequest(body: { session_id?: string; cwd?: string; referral_id?: string }): { + status: number; + body: Record; +} { + purgeExpiredClaims(); + + const sessionId = body.session_id ?? "unknown"; + const cwd = body.cwd ?? "unknown"; + const referralId = body.referral_id; + + // Polling with existing referral + if (referralId) { + const claim = pendingClaims.get(referralId); + if (!claim) { + // Unknown referral — expired or never existed + return { status: 408, body: { error: "expired", referral_id: referralId } }; + } + if (claim.status === "approved") { + pendingClaims.delete(referralId); + return { + status: 200, + body: { claimed: true, referral_id: referralId, owner: claimOwner }, + }; + } + if (claim.status === "denied") { + pendingClaims.delete(referralId); + return { status: 401, body: { denied: true, referral_id: referralId } }; + } + if (claim.status === "expired") { + pendingClaims.delete(referralId); + return { status: 408, body: { error: "expired", referral_id: referralId } }; + } + // Still pending + const position = [...pendingClaims.values()] + .filter((c) => c.status === "pending" && c.createdAt <= claim.createdAt) + .length; + return { + status: 202, + body: { + referral_id: referralId, + status: "pending", + position, + owner: claimOwner, + owner_cwd: claimOwnerCwd, + }, + }; + } + + // New claim request (no referral_id) + if (pendingClaims.size >= MAX_PENDING_CLAIMS) { + return { status: 503, body: { error: "too many pending claims" } }; + } + + const newReferralId = generateReferralId(); + const claim: ClaimRequest = { + referralId: newReferralId, + sessionId, + cwd, + status: "pending", + createdAt: Date.now(), + }; + pendingClaims.set(newReferralId, claim); + + console.log(`[gitea-claim] New claim request from ${sessionId} (${cwd}), referral: ${newReferralId}`); + if (onClaimRequested) onClaimRequested(claim); + + return { + status: 202, + body: { + referral_id: newReferralId, + status: "pending", + position: pendingClaims.size, + owner: claimOwner, + owner_cwd: claimOwnerCwd, + }, + }; +} + /** Repos where we successfully installed a webhook */ let webhookRepos: Map = new Map(); @@ -302,7 +486,7 @@ function validateToken(req: IncomingMessage): boolean { } export function startWebhookServer(_pi: ExtensionAPI) { - return new Promise(async (resolve, reject) => { + return new Promise(async (resolve) => { try { const http = await import("node:http"); @@ -326,6 +510,9 @@ export function startWebhookServer(_pi: ExtensionAPI) { status: "ok", uptime: process.uptime(), bot_user: BOT_USER, + session_id: SESSION_ID, + claim_owner: claimOwner, + claim_owner_cwd: claimOwnerCwd, webhook_repos: webhookRepos.size, collab_repos: collabRepos.size, queue_depth: processingQueue.length, @@ -334,6 +521,46 @@ export function startWebhookServer(_pi: ExtensionAPI) { return; } + // GET /claim — current owner info + if (url === "/claim" && req.method === "GET") { + purgeExpiredClaims(); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + owner: claimOwner, + owner_cwd: claimOwnerCwd, + server_session: SESSION_ID, + pending: [...pendingClaims.values()] + .filter((c) => c.status === "pending") + .map((c) => ({ + referral_id: c.referralId, + session_id: c.sessionId, + cwd: c.cwd, + age_ms: Date.now() - c.createdAt, + })), + })); + return; + } + + // POST /claim — request or poll claim + if (url === "/claim" && req.method === "POST") { + let rawBody = ""; + for await (const chunk of req) rawBody += chunk.toString(); + + let reqBody: any; + try { + reqBody = JSON.parse(rawBody); + } catch { + res.writeHead(400, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Invalid JSON" })); + return; + } + + const result = handleClaimRequest(reqBody); + res.writeHead(result.status, { "Content-Type": "application/json" }); + res.end(JSON.stringify(result.body)); + return; + } + // POST /hooks/gitea if (url === "/hooks/gitea" && req.method === "POST") { if (!validateToken(req)) { @@ -366,13 +593,28 @@ export function startWebhookServer(_pi: ExtensionAPI) { }); server.listen(WEBHOOK_PORT, WEBHOOK_HOST, () => { - console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT}`); + serverOwned = true; + claimOwner = SESSION_ID; + claimOwnerCwd = process.cwd(); + console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT} (session: ${SESSION_ID})`); resolve(); }); - server.on("error", reject); + server.on("error", (err: NodeJS.ErrnoException) => { + if (err.code === "EADDRINUSE") { + server = null; + serverOwned = false; + console.log(`[gitea-webhook] Port ${WEBHOOK_PORT} already in use — another pi instance owns the webhook server`); + console.log(`[gitea-webhook] Use /webhook:claim to request ownership, or /webhook:status to see current owner`); + resolve(); // not fatal — tools still work, just no local server + } else { + console.error(`[gitea-webhook] Server error: ${err.message}`); + resolve(); // don't crash the extension + } + }); } catch (err) { - reject(err); + console.error(`[gitea-webhook] Failed to create server: ${err}`); + resolve(); } }); } @@ -695,4 +937,4 @@ export function setRepoConfig(repo: string, config: Partial): RepoCo return updated; } -export { parseMentions, type MentionInfo, type RepoConfig }; +export { parseMentions, type MentionInfo, type RepoConfig, type ClaimRequest, SESSION_ID, WEBHOOK_PORT, WEBHOOK_HOST };