/** * pi-gitea Extension — entry point * * Registers Gitea tools (read + write) and optional webhook server. * Supports @mention routing for multi-bot coordination. * Supports multi-session claim system for webhook ownership. */ import registerReadTools from "./tools/read-tools.js"; import registerWriteTools from "./tools/write-tools.js"; import { startWebhookServer, stopWebhookServer, startPolling, stopPolling, setSendMessage, getTrackedRepos, setRepoConfig, setOnClaimRequested, approvePendingClaim, getClaimStatus, SESSION_ID, WEBHOOK_PORT, WEBHOOK_HOST, } from "./webhook/server.js"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; export default function (pi: ExtensionAPI) { registerReadTools(pi); registerWriteTools(pi); // ── Repo config tool ───────────────────────────────────────────────────── pi.registerTool({ name: "gitea_repo_config", label: "Gitea: Configure Repo Response Mode", description: 'Set how the bot responds to events on a repo. ' + '"all" = respond to every event. ' + '"mention" = respond only when @mentioned or assigned. ' + 'Collab repos default to "mention", own repos default to "all".', parameters: Type.Object({ repo: Type.String({ description: "Repository (owner/name)" }), respondTo: Type.String({ description: '"all" or "mention"' }), }), async execute(_id, params) { const mode = params.respondTo as "all" | "mention"; if (mode !== "all" && mode !== "mention") { return { content: [{ type: "text", text: `Invalid mode "${params.respondTo}". Use "all" or "mention".` }], }; } const config = setRepoConfig(params.repo, { respondTo: mode }); return { content: [{ type: "text", text: `${params.repo}: respondTo = ${config.respondTo}` }], details: { repo: params.repo, config }, }; }, }); pi.registerTool({ name: "gitea_tracked_repos", label: "Gitea: List Tracked Repos", description: "Show all tracked repos, their type (webhook/collab), and response mode.", parameters: Type.Object({}), async execute() { const { webhook, collab, configs } = getTrackedRepos(); const lines: string[] = []; for (const [name] of webhook) { const mode = configs.get(name)?.respondTo ?? "all"; lines.push(`${name} — webhook (respondTo: ${mode})`); } for (const name of collab) { const mode = configs.get(name)?.respondTo ?? "mention"; lines.push(`${name} — collab/notification (respondTo: ${mode})`); } return { content: [{ type: "text", text: lines.length > 0 ? lines.join("\n") : "No repos tracked." }], details: { webhook: [...webhook.keys()], collab: [...collab], configs: Object.fromEntries(configs) }, }; }, }); // ── Webhook claim commands ─────────────────────────────────────────────── pi.registerCommand("webhook:status", { description: "Show webhook server status and claim ownership", handler: async (_args, _ctx) => { const status = getClaimStatus(); if (status.isOwner && status.pending.length === 0) { console.log(`Webhook: you own the server (session: ${status.sessionId})`); console.log(` port: ${WEBHOOK_PORT}, cwd: ${status.ownerCwd}`); } else if (status.isOwner) { console.log(`Webhook: you own the server (session: ${status.sessionId})`); console.log(` port: ${WEBHOOK_PORT}, cwd: ${status.ownerCwd}`); console.log(` ${status.pending.length} pending claim(s):`); for (const c of status.pending) { const age = Math.round((Date.now() - c.createdAt) / 1000); console.log(` ${c.referralId} from ${c.sessionId} (${c.cwd}) — ${age}s ago`); } console.log(` Use /webhook:release to hand off to next in queue`); } else { // Not the server owner — query the server try { const res = await fetch(`http://127.0.0.1:${WEBHOOK_PORT}/claim`); if (res.ok) { const data = await res.json() as Record; console.log(`Webhook: owned by another session`); console.log(` owner: ${data.owner} (cwd: ${data.owner_cwd})`); console.log(` server session: ${data.server_session}`); const pending = data.pending as Array>; if (pending?.length) { console.log(` ${pending.length} pending claim(s)`); } console.log(` Use /webhook:claim to request ownership`); } else { console.log(`Webhook: no server running on port ${WEBHOOK_PORT}`); } } catch { console.log(`Webhook: no server reachable on port ${WEBHOOK_PORT}`); } } }, }); pi.registerCommand("webhook:claim", { description: "Request webhook ownership from the current server owner", handler: async (_args, ctx) => { const status = getClaimStatus(); if (status.isOwner) { console.log("You already own the webhook server."); return; } // POST to the running server's /claim endpoint const url = `http://127.0.0.1:${WEBHOOK_PORT}/claim`; let referralId: string | undefined; try { // Initial request const initRes = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ session_id: SESSION_ID, cwd: process.cwd() }), }); if (initRes.status === 503) { console.log("Server has too many pending claims. Try again later."); return; } const initData = await initRes.json() as Record; if (initRes.status === 200) { console.log("Claimed immediately (no contention)."); return; } if (initRes.status !== 202) { console.log(`Unexpected response: ${initRes.status} ${JSON.stringify(initData)}`); return; } referralId = initData.referral_id as string; console.log(`Claim submitted (referral: ${referralId})`); console.log(` Current owner: ${initData.owner} (${initData.owner_cwd})`); console.log(` Queue position: ${initData.position}`); console.log(` Waiting for owner to /webhook:release ...`); // Poll every 3s for up to 5 minutes const deadline = Date.now() + 5 * 60 * 1000; while (Date.now() < deadline) { await new Promise((r) => setTimeout(r, 3000)); const pollRes = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ session_id: SESSION_ID, referral_id: referralId }), }); if (pollRes.status === 200) { console.log("Claim approved — you now own the webhook server."); return; } if (pollRes.status === 401) { console.log("Claim denied by the current owner."); return; } if (pollRes.status === 408) { console.log("Claim expired (timed out)."); return; } // 202 = still pending, continue polling } console.log("Claim timed out (client-side deadline)."); } catch (err) { const msg = err instanceof Error ? err.message : String(err); if (msg.includes("ECONNREFUSED")) { console.log(`No webhook server running on port ${WEBHOOK_PORT}.`); } else { console.log(`Claim failed: ${msg}`); } } }, }); pi.registerCommand("webhook:release", { description: "Release webhook ownership to the next pending claimer", handler: async (_args, _ctx) => { const status = getClaimStatus(); if (!status.isOwner) { console.log("You don't own the webhook server — nothing to release."); return; } if (status.pending.length === 0) { console.log("No pending claims to hand off to."); return; } const { approved, newOwner } = approvePendingClaim(); if (approved) { console.log(`Ownership transferred to ${approved.sessionId} (${approved.cwd})`); console.log(`New owner session: ${newOwner}`); } else { console.log("No pending claims (they may have expired)."); } }, }); // ── Lifecycle ──────────────────────────────────────────────────────────── // GITEA_ENABLE_POLLING=1 opts in to running the webhook server + notification poller. // pi-bot (persistent session): enabled by default // openclaw (ephemeral sessions): disabled by default — events come via openclaw hooks const enablePolling = process.env.GITEA_ENABLE_POLLING === "1" || (!process.env.GITEA_HOOKS_URL && !process.env.OPENCLAW_HOOKS_URL); pi.on("session_start", async (_event, ctx) => { console.log(`[pi-gitea] Session started (session: ${SESSION_ID})`); // Set up claim notification — alert the user when another session wants ownership setOnClaimRequested((claim) => { console.log(`[pi-gitea] Claim request from ${claim.sessionId} (${claim.cwd})`); console.log(`[pi-gitea] Use /webhook:release to hand off, or ignore to let it expire`); }); // Auto-detect runtime: pi-bot (persistent session) vs openclaw (hooks endpoint) if (ctx.sendUserMessage) { // pi-bot: inject directly into the running session console.log("[pi-gitea] Delivery: sendUserMessage (pi-bot mode)"); setSendMessage((msg: string) => { ctx.sendUserMessage(msg, { deliverAs: "followUp" }); return Promise.resolve(); }); } else { // openclaw: POST to the hooks endpoint const hooksUrl = process.env.GITEA_HOOKS_URL ?? "http://localhost:3001"; const hooksPath = process.env.GITEA_HOOKS_PATH ?? "/hooks/agent"; const hooksToken = process.env.GITEA_HOOKS_TOKEN ?? ""; console.log(`[pi-gitea] Delivery: openclaw hooks (${hooksUrl}${hooksPath})`); setSendMessage(async (msg: string) => { const res = await fetch(`${hooksUrl}${hooksPath}`, { method: "POST", headers: { "Content-Type": "application/json", ...(hooksToken ? { Authorization: `Bearer ${hooksToken}` } : {}), }, body: JSON.stringify({ message: msg }), }); if (!res.ok) { const body = await res.text().catch(() => ""); throw new Error(`Hooks POST failed: ${res.status} ${body}`); } }); } if (enablePolling) { await startWebhookServer(pi); await startPolling(pi); } else { console.log("[pi-gitea] Webhook server + polling disabled (openclaw mode — tools only)"); } }); pi.on("session_shutdown", async () => { console.log("[pi-gitea] Session shutting down"); if (enablePolling) { await stopWebhookServer(); stopPolling(); } }); }