feat: multi-session webhook claim system with EADDRINUSE handling

- Server gracefully handles EADDRINUSE (logs notice, continues without server)
- POST /claim endpoint with referral-based async handoff:
  202 (pending) -> poll with referral_id -> 200/401/408
- GET /claim shows current owner and pending queue
- DOS protection: 503 when MAX_PENDING_CLAIMS (10) reached
- Claims expire after 5 minutes
- Pi commands: /webhook:status, /webhook:claim, /webhook:release
This commit is contained in:
2026-03-14 13:47:40 -07:00
parent aa39af1c66
commit ba88fa50f9
2 changed files with 413 additions and 15 deletions

View File

@@ -32,6 +32,7 @@ const STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json";
// ── State ────────────────────────────────────────────────────────────────────
let server: Server | null = null;
let serverOwned = false; // true if THIS process bound the port
let processingQueue: Array<{ prompt: string; timestamp: number }> = [];
const maxQueueDepth = 50;
let isProcessing = false;
@@ -39,6 +40,189 @@ let repoPollTimer: NodeJS.Timeout | null = null;
let notifPollTimer: NodeJS.Timeout | null = null;
let sendMessage: ((message: string) => Promise<void>) | null = null;
// ── Claim system ─────────────────────────────────────────────────────────────
// Multiple pi sessions may start concurrently. Only one can bind the webhook
// port. The instance that binds it becomes the initial "claim owner" — webhook
// events are delivered to its sendMessage callback.
//
// Other sessions can request ownership via POST /claim:
//
// 1. Client sends POST /claim { session_id, cwd, referral_id? }
// 2. Server responds 202 { referral_id, position, owner } — queued
// 3. Client polls POST /claim { session_id, referral_id }
// 4. Server responds:
// 202 — still pending (current owner hasn't released)
// 200 — claimed (you are the new owner)
// 401 — denied (owner explicitly rejected — reserved for future use)
// 408 — timed out (claim expired)
// 5. GET /claim — returns current owner info + pending queue
//
// The current owner releases via the /webhook:release pi command, which calls
// approvePendingClaim() in-process (no HTTP needed — it's the server process).
//
// DOS protection: max pending referrals; 503 for new clients when full.
/** Unique id for this pi process */
const SESSION_ID = `pi-${process.pid}-${Date.now().toString(36)}`;
/** Session id of whoever currently receives events */
let claimOwner: string = SESSION_ID;
/** CWD of current owner (for display) */
let claimOwnerCwd: string = process.cwd();
interface ClaimRequest {
referralId: string;
sessionId: string;
cwd: string;
status: "pending" | "approved" | "denied" | "expired";
createdAt: number;
}
const MAX_PENDING_CLAIMS = 10;
const CLAIM_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
let pendingClaims: Map<string, ClaimRequest> = new Map();
/** Notify callback — set by the extension to alert the current owner */
let onClaimRequested: ((claim: ClaimRequest) => void) | null = null;
export function setOnClaimRequested(fn: (claim: ClaimRequest) => void) {
onClaimRequested = fn;
}
function generateReferralId(): string {
return `ref-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function purgeExpiredClaims() {
const now = Date.now();
for (const [id, claim] of pendingClaims) {
if (now - claim.createdAt > CLAIM_TIMEOUT_MS) {
claim.status = "expired";
pendingClaims.delete(id);
}
}
}
/**
* Called by the server owner (via /webhook:release command) to approve the
* next pending claim. Transfers ownership to the oldest pending request.
*/
export function approvePendingClaim(): { approved: ClaimRequest | null; newOwner: string } {
purgeExpiredClaims();
// Find oldest pending
let oldest: ClaimRequest | null = null;
for (const claim of pendingClaims.values()) {
if (claim.status === "pending") {
if (!oldest || claim.createdAt < oldest.createdAt) {
oldest = claim;
}
}
}
if (!oldest) {
return { approved: null, newOwner: claimOwner };
}
oldest.status = "approved";
claimOwner = oldest.sessionId;
claimOwnerCwd = oldest.cwd;
console.log(`[gitea-claim] Ownership transferred to ${oldest.sessionId} (${oldest.cwd})`);
return { approved: oldest, newOwner: claimOwner };
}
/** Get current claim status for display */
export function getClaimStatus() {
purgeExpiredClaims();
return {
owner: claimOwner,
ownerCwd: claimOwnerCwd,
sessionId: SESSION_ID,
isOwner: claimOwner === SESSION_ID,
pending: [...pendingClaims.values()].filter((c) => c.status === "pending"),
};
}
/** Handle POST /claim from another pi instance */
function handleClaimRequest(body: { session_id?: string; cwd?: string; referral_id?: string }): {
status: number;
body: Record<string, unknown>;
} {
purgeExpiredClaims();
const sessionId = body.session_id ?? "unknown";
const cwd = body.cwd ?? "unknown";
const referralId = body.referral_id;
// Polling with existing referral
if (referralId) {
const claim = pendingClaims.get(referralId);
if (!claim) {
// Unknown referral — expired or never existed
return { status: 408, body: { error: "expired", referral_id: referralId } };
}
if (claim.status === "approved") {
pendingClaims.delete(referralId);
return {
status: 200,
body: { claimed: true, referral_id: referralId, owner: claimOwner },
};
}
if (claim.status === "denied") {
pendingClaims.delete(referralId);
return { status: 401, body: { denied: true, referral_id: referralId } };
}
if (claim.status === "expired") {
pendingClaims.delete(referralId);
return { status: 408, body: { error: "expired", referral_id: referralId } };
}
// Still pending
const position = [...pendingClaims.values()]
.filter((c) => c.status === "pending" && c.createdAt <= claim.createdAt)
.length;
return {
status: 202,
body: {
referral_id: referralId,
status: "pending",
position,
owner: claimOwner,
owner_cwd: claimOwnerCwd,
},
};
}
// New claim request (no referral_id)
if (pendingClaims.size >= MAX_PENDING_CLAIMS) {
return { status: 503, body: { error: "too many pending claims" } };
}
const newReferralId = generateReferralId();
const claim: ClaimRequest = {
referralId: newReferralId,
sessionId,
cwd,
status: "pending",
createdAt: Date.now(),
};
pendingClaims.set(newReferralId, claim);
console.log(`[gitea-claim] New claim request from ${sessionId} (${cwd}), referral: ${newReferralId}`);
if (onClaimRequested) onClaimRequested(claim);
return {
status: 202,
body: {
referral_id: newReferralId,
status: "pending",
position: pendingClaims.size,
owner: claimOwner,
owner_cwd: claimOwnerCwd,
},
};
}
/** Repos where we successfully installed a webhook */
let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
@@ -302,7 +486,7 @@ function validateToken(req: IncomingMessage): boolean {
}
export function startWebhookServer(_pi: ExtensionAPI) {
return new Promise<void>(async (resolve, reject) => {
return new Promise<void>(async (resolve) => {
try {
const http = await import("node:http");
@@ -326,6 +510,9 @@ export function startWebhookServer(_pi: ExtensionAPI) {
status: "ok",
uptime: process.uptime(),
bot_user: BOT_USER,
session_id: SESSION_ID,
claim_owner: claimOwner,
claim_owner_cwd: claimOwnerCwd,
webhook_repos: webhookRepos.size,
collab_repos: collabRepos.size,
queue_depth: processingQueue.length,
@@ -334,6 +521,46 @@ export function startWebhookServer(_pi: ExtensionAPI) {
return;
}
// GET /claim — current owner info
if (url === "/claim" && req.method === "GET") {
purgeExpiredClaims();
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({
owner: claimOwner,
owner_cwd: claimOwnerCwd,
server_session: SESSION_ID,
pending: [...pendingClaims.values()]
.filter((c) => c.status === "pending")
.map((c) => ({
referral_id: c.referralId,
session_id: c.sessionId,
cwd: c.cwd,
age_ms: Date.now() - c.createdAt,
})),
}));
return;
}
// POST /claim — request or poll claim
if (url === "/claim" && req.method === "POST") {
let rawBody = "";
for await (const chunk of req) rawBody += chunk.toString();
let reqBody: any;
try {
reqBody = JSON.parse(rawBody);
} catch {
res.writeHead(400, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Invalid JSON" }));
return;
}
const result = handleClaimRequest(reqBody);
res.writeHead(result.status, { "Content-Type": "application/json" });
res.end(JSON.stringify(result.body));
return;
}
// POST /hooks/gitea
if (url === "/hooks/gitea" && req.method === "POST") {
if (!validateToken(req)) {
@@ -366,13 +593,28 @@ export function startWebhookServer(_pi: ExtensionAPI) {
});
server.listen(WEBHOOK_PORT, WEBHOOK_HOST, () => {
console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT}`);
serverOwned = true;
claimOwner = SESSION_ID;
claimOwnerCwd = process.cwd();
console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT} (session: ${SESSION_ID})`);
resolve();
});
server.on("error", reject);
server.on("error", (err: NodeJS.ErrnoException) => {
if (err.code === "EADDRINUSE") {
server = null;
serverOwned = false;
console.log(`[gitea-webhook] Port ${WEBHOOK_PORT} already in use — another pi instance owns the webhook server`);
console.log(`[gitea-webhook] Use /webhook:claim to request ownership, or /webhook:status to see current owner`);
resolve(); // not fatal — tools still work, just no local server
} else {
console.error(`[gitea-webhook] Server error: ${err.message}`);
resolve(); // don't crash the extension
}
});
} catch (err) {
reject(err);
console.error(`[gitea-webhook] Failed to create server: ${err}`);
resolve();
}
});
}
@@ -695,4 +937,4 @@ export function setRepoConfig(repo: string, config: Partial<RepoConfig>): RepoCo
return updated;
}
export { parseMentions, type MentionInfo, type RepoConfig };
export { parseMentions, type MentionInfo, type RepoConfig, type ClaimRequest, SESSION_ID, WEBHOOK_PORT, WEBHOOK_HOST };