diff --git a/pi-extension/webhook/server.ts b/pi-extension/webhook/server.ts index bea2e8a..04674b0 100644 --- a/pi-extension/webhook/server.ts +++ b/pi-extension/webhook/server.ts @@ -1,28 +1,42 @@ /** - * Webhook server — receives Gitea events via HTTP + * Webhook server — receives Gitea events via HTTP + polls collab repos * * Auth: Bearer token validation (PI_WEBHOOK_TOKEN). * No HMAC/secret — consistent with token-based auth strategy. + * + * Repos where the bot has admin access get webhooks installed automatically. + * Repos where the bot is a non-admin collaborator are polled for new events. */ import type { Server, IncomingMessage, ServerResponse } from "node:http"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; -import { GiteaClient } from "../../src/client.js"; +import { GiteaClient, GiteaError } from "../../src/client.js"; const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0"; const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10); const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? ""; const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? ""; const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10); +const EVENT_POLL_INTERVAL = parseInt(process.env.PI_EVENT_POLL_INTERVAL ?? "60", 10); +const BOT_USER = process.env.PI_GIT_USER ?? ""; let server: Server | null = null; -let trackedRepos: Map = new Map(); let processingQueue: Array<{ event: any; timestamp: number }> = []; let maxQueueDepth = 50; let isProcessing = false; -let pollTimer: NodeJS.Timeout | null = null; +let repoPollTimer: NodeJS.Timeout | null = null; +let eventPollTimer: NodeJS.Timeout | null = null; let sendMessage: ((message: string) => Promise) | null = null; +/** Repos where we successfully installed a webhook */ +let webhookRepos: Map = new Map(); + +/** Repos where webhook install failed (403) — we poll these for events */ +let pollOnlyRepos: Map = new Map(); + +/** Track all known repos so we don't re-attempt webhook install every cycle */ +let knownRepos: Set = new Set(); + export function setSendMessage(fn: (message: string) => Promise) { sendMessage = fn; } @@ -84,6 +98,46 @@ function formatEventPrompt(event: any): string { return prompt; } +/** Format a polled issue/PR/comment as a synthetic event prompt */ +function formatPolledEvent(type: "issue" | "pull_request" | "issue_comment", item: any, repo: string): string { + let prompt = `New Gitea event on ${repo} (via polling):\n\n`; + prompt += `**Action**: opened\n\n`; + + if (type === "issue") { + prompt += `**Issue #${item.number}: ${item.title}**\n`; + prompt += `**Author**: @${item.user?.login || "unknown"}\n`; + prompt += `**Labels**: ${item.labels?.map((l: any) => l.name).join(", ") || "none"}\n`; + prompt += `**Body**:\n${item.body || "(no body)"}\n\n`; + } else if (type === "pull_request") { + prompt += `**PR #${item.number}: ${item.title}**\n`; + prompt += `**Author**: @${item.user?.login || "unknown"}\n`; + prompt += `**Base**: ${item.base?.label} ← **Head**: ${item.head?.label}\n`; + prompt += `**Body**:\n${item.body || "(no body)"}\n\n`; + } else if (type === "issue_comment") { + prompt += `**Comment on #${item._issueNumber}**\n`; + prompt += `**Author**: @${item.user?.login || "unknown"}\n`; + prompt += `**Body**:\n${item.body || "(no body)"}\n\n`; + } + + prompt += `---\n\n`; + prompt += `Please analyze this event and decide how to respond.\n`; + prompt += `Use the available Gitea tools to interact with the repository.`; + + return prompt; +} + +/** Enqueue an event for LLM processing */ +function enqueueEvent(event: any) { + processingQueue.push({ event, timestamp: Date.now() }); + + if (processingQueue.length > maxQueueDepth) { + processingQueue.shift(); + console.warn(`[gitea-webhook] Queue full, dropped oldest event`); + } + + void processQueue(); +} + /** Process the event queue */ async function processQueue() { if (isProcessing || processingQueue.length === 0) return; @@ -92,15 +146,17 @@ async function processQueue() { while (processingQueue.length > 0) { const { event } = processingQueue.shift()!; - const repoName = event.repository?.full_name || "unknown"; + const repoName = event.repository?.full_name || event._repo || "unknown"; - console.log(`[gitea-webhook] Processing event: ${event.action} on ${repoName}`); + console.log(`[gitea-webhook] Processing event: ${event.action || event._type} on ${repoName}`); if (sendMessage) { try { - const prompt = formatEventPrompt(event); + const prompt = event._polled + ? formatPolledEvent(event._type, event._item, event._repo) + : formatEventPrompt(event); await sendMessage(prompt); - console.log(`[gitea-webhook] Event sent to LLM: ${event.action} on ${repoName}`); + console.log(`[gitea-webhook] Event sent to LLM: ${event.action || event._type} on ${repoName}`); } catch (err) { console.error(`[gitea-webhook] Failed to send event to LLM:`, err); } @@ -112,6 +168,8 @@ async function processQueue() { isProcessing = false; } +// ── HTTP Server ────────────────────────────────────────────────────────────── + /** Start webhook server */ export function startWebhookServer(_pi: ExtensionAPI) { return new Promise(async (resolve, reject) => { @@ -138,7 +196,8 @@ export function startWebhookServer(_pi: ExtensionAPI) { JSON.stringify({ status: "ok", uptime: process.uptime(), - tracked_repos: trackedRepos.size, + webhook_repos: webhookRepos.size, + poll_only_repos: pollOnlyRepos.size, queue_depth: processingQueue.length, is_processing: isProcessing, }), @@ -148,7 +207,6 @@ export function startWebhookServer(_pi: ExtensionAPI) { // POST /hooks/gitea if (url === "/hooks/gitea" && req.method === "POST") { - // Validate token if (!validateToken(req)) { res.writeHead(401, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Unauthorized" })); @@ -156,13 +214,11 @@ export function startWebhookServer(_pi: ExtensionAPI) { return; } - // Read body let body = ""; for await (const chunk of req) { body += chunk.toString(); } - // Parse event let event; try { event = JSON.parse(body); @@ -172,17 +228,7 @@ export function startWebhookServer(_pi: ExtensionAPI) { return; } - // Queue event - processingQueue.push({ event, timestamp: Date.now() }); - - if (processingQueue.length > maxQueueDepth) { - const dropped = processingQueue.shift(); - if (dropped) { - console.warn(`[gitea-webhook] Queue full, dropping oldest event`); - } - } - - void processQueue(); + enqueueEvent(event); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ received: true, event: event.action })); @@ -220,43 +266,25 @@ export async function stopWebhookServer() { }); } -/** Poll for new repos and register webhooks */ -export function startPolling(_pi: ExtensionAPI) { - void fetchUserRepos(); - pollTimer = setInterval(() => void fetchUserRepos(), POLL_INTERVAL * 1000); - console.log(`[gitea-polling] Polling started (interval: ${POLL_INTERVAL}s)`); -} +// ── Repo Discovery & Webhook Registration ──────────────────────────────────── -export function stopPolling() { - if (pollTimer) { - clearInterval(pollTimer); - pollTimer = null; - } -} - -async function fetchUserRepos() { +/** Discover repos and install webhooks (or mark as poll-only on 403) */ +async function discoverRepos() { try { const client = new GiteaClient(); - if (!WEBHOOK_URL) { - console.warn("[gitea-polling] PI_WEBHOOK_URL not set, skipping webhook registration"); - return; - } - const repos = await client.get("/user/repos?limit=100"); - const newRepos: any[] = []; + let newWebhooks = 0; + let newPollOnly = 0; + for (const repo of repos) { - if (!trackedRepos.has(repo.full_name)) { - newRepos.push(repo); - } - } + const name = repo.full_name; + if (knownRepos.has(name)) continue; - if (newRepos.length > 0) { - console.log(`[gitea-polling] Found ${newRepos.length} new repos, registering webhooks...`); - - for (const repo of newRepos) { + // Try to install webhook + if (WEBHOOK_URL) { try { - const webhook = await client.post(`/repos/${repo.full_name}/hooks`, { + const webhook = await client.post(`/repos/${name}/hooks`, { type: "gitea", config: { url: `${WEBHOOK_URL}/hooks/gitea`, @@ -267,20 +295,175 @@ async function fetchUserRepos() { active: true, }); - trackedRepos.set(repo.full_name, { webhookId: webhook.id, addedAt: Date.now() }); - console.log(`[gitea-polling] Webhook created for ${repo.full_name} (ID: ${webhook.id})`); + webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() }); + knownRepos.add(name); + newWebhooks++; + console.log(`[gitea-polling] ✅ Webhook installed: ${name} (ID: ${webhook.id})`); + continue; } catch (err) { - console.error(`[gitea-polling] Error creating webhook for ${repo.full_name}:`, err); + if (err instanceof GiteaError && err.status === 403) { + // No admin access — fall back to polling + // Set lastPollAt to 5 minutes ago so we catch recent events + const fiveMinAgo = new Date(Date.now() - 5 * 60 * 1000).toISOString(); + pollOnlyRepos.set(name, { + addedAt: Date.now(), + lastPollAt: fiveMinAgo, + }); + knownRepos.add(name); + newPollOnly++; + console.log(`[gitea-polling] 📋 Poll-only (no admin): ${name}`); + continue; + } + // Other error — log but don't add to known (retry next cycle) + console.error(`[gitea-polling] ❌ Webhook error for ${name}: ${err instanceof Error ? err.message : err}`); } + } else { + // No webhook URL configured — all repos are poll-only + const fiveMinAgo = new Date(Date.now() - 5 * 60 * 1000).toISOString(); + pollOnlyRepos.set(name, { + addedAt: Date.now(), + lastPollAt: fiveMinAgo, + }); + knownRepos.add(name); + newPollOnly++; } } - console.log(`[gitea-polling] Current repos: ${trackedRepos.size}`); + if (newWebhooks > 0 || newPollOnly > 0) { + console.log(`[gitea-polling] Discovered: ${newWebhooks} webhook, ${newPollOnly} poll-only`); + } + console.log(`[gitea-polling] Total: ${webhookRepos.size} webhook + ${pollOnlyRepos.size} poll-only repos`); } catch (err) { - console.error("[gitea-polling] Error fetching user repos:", err); + console.error("[gitea-polling] Error discovering repos:", err instanceof Error ? err.message : err); } } -export function getTrackedRepos() { - return new Map(trackedRepos); +// ── Event Polling for Non-Admin Repos ──────────────────────────────────────── + +/** Poll all poll-only repos for new issues, PRs, and comments */ +async function pollForEvents() { + if (pollOnlyRepos.size === 0) { + console.log(`[gitea-poll-events] No poll-only repos to check`); + return; + } + + console.log(`[gitea-poll-events] Checking ${pollOnlyRepos.size} repos for new events...`); + const client = new GiteaClient(); + + for (const [repoName, state] of pollOnlyRepos) { + try { + const since = state.lastPollAt; + const sinceDate = new Date(since); + const now = new Date().toISOString(); + const [owner, repo] = repoName.split("/"); + + // Fetch recent issues created/updated since last poll + const issues = await client.get( + `/repos/${owner}/${repo}/issues?state=open&sort=created&type=issues&since=${since}&limit=20` + ); + + for (const issue of issues) { + // Skip issues created by the bot itself + if (issue.user?.login === BOT_USER) continue; + + // Only process issues created after our last poll (not just updated) + if (new Date(issue.created_at) > sinceDate) { + console.log(`[gitea-poll-events] New issue: ${repoName}#${issue.number} "${issue.title}"`); + enqueueEvent({ + _polled: true, + _type: "issue", + _item: issue, + _repo: repoName, + action: "opened", + repository: { full_name: repoName }, + }); + } + + // Check for new comments on this issue + try { + const comments = await client.get( + `/repos/${owner}/${repo}/issues/${issue.number}/comments?since=${since}` + ); + for (const comment of comments) { + if (comment.user?.login === BOT_USER) continue; + if (new Date(comment.created_at) > sinceDate) { + console.log(`[gitea-poll-events] New comment on ${repoName}#${issue.number} by @${comment.user?.login}`); + enqueueEvent({ + _polled: true, + _type: "issue_comment", + _item: { ...comment, _issueNumber: issue.number }, + _repo: repoName, + action: "created", + repository: { full_name: repoName }, + }); + } + } + } catch { + // Comment fetch failed, skip + } + } + + // Fetch recent PRs + const prs = await client.get( + `/repos/${owner}/${repo}/pulls?state=open&sort=created&limit=20` + ); + + for (const pr of prs) { + if (pr.user?.login === BOT_USER) continue; + + if (new Date(pr.created_at) > sinceDate) { + console.log(`[gitea-poll-events] New PR: ${repoName}#${pr.number} "${pr.title}"`); + enqueueEvent({ + _polled: true, + _type: "pull_request", + _item: pr, + _repo: repoName, + action: "opened", + repository: { full_name: repoName }, + }); + } + } + + // Update last poll timestamp + state.lastPollAt = now; + } catch (err) { + console.error(`[gitea-poll-events] Error polling ${repoName}: ${err instanceof Error ? err.message : err}`); + } + } +} + +// ── Public API ─────────────────────────────────────────────────────────────── + +/** Start repo discovery polling + event polling */ +export function startPolling(_pi: ExtensionAPI) { + // Discover repos immediately, then on interval + void discoverRepos(); + repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000); + console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`); + + // Event polling for poll-only repos on a faster interval + eventPollTimer = setInterval(() => void pollForEvents(), EVENT_POLL_INTERVAL * 1000); + console.log(`[gitea-polling] Event polling started (interval: ${EVENT_POLL_INTERVAL}s)`); +} + +export function stopPolling() { + if (repoPollTimer) { + clearInterval(repoPollTimer); + repoPollTimer = null; + } + if (eventPollTimer) { + clearInterval(eventPollTimer); + eventPollTimer = null; + } + // Clear state for clean reload + webhookRepos.clear(); + pollOnlyRepos.clear(); + knownRepos.clear(); +} + +export function getTrackedRepos() { + return { + webhook: new Map(webhookRepos), + pollOnly: new Map(pollOnlyRepos), + }; }