/** * Webhook server — receives Gitea events via HTTP * * Auth: Bearer token validation (PI_WEBHOOK_TOKEN). * No HMAC/secret — consistent with token-based auth strategy. */ import type { Server, IncomingMessage, ServerResponse } from "node:http"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { GiteaClient } from "../../src/client.js"; const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0"; const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10); const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? ""; const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? ""; const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10); let server: Server | null = null; let trackedRepos: Map = new Map(); let processingQueue: Array<{ event: any; timestamp: number }> = []; let maxQueueDepth = 50; let isProcessing = false; let pollTimer: NodeJS.Timeout | null = null; let sendMessage: ((message: string) => Promise) | null = null; export function setSendMessage(fn: (message: string) => Promise) { sendMessage = fn; } /** Validate bearer token on incoming webhook request */ function validateToken(req: IncomingMessage): boolean { if (!WEBHOOK_TOKEN) return true; // No token configured = open (localhost only) const auth = req.headers["authorization"]; if (!auth) return false; return auth === `Bearer ${WEBHOOK_TOKEN}`; } /** Format a Gitea event as a prompt for the LLM */ function formatEventPrompt(event: any): string { const action = event.action; const repo = event.repository?.full_name || "unknown"; let prompt = `New Gitea event on ${repo}:\n\n`; prompt += `**Action**: ${action}\n\n`; if (event.issue) { const issue = event.issue; prompt += `**Issue #${issue.number}: ${issue.title}**\n`; prompt += `**Author**: @${issue.user?.login || "unknown"}\n`; prompt += `**Labels**: ${issue.labels?.map((l: any) => l.name).join(", ") || "none"}\n`; prompt += `**Body**:\n${issue.body || "(no body)"}\n\n`; } if (event.pull_request) { const pr = event.pull_request; prompt += `**PR #${pr.number}: ${pr.title}**\n`; prompt += `**Author**: @${pr.user?.login || "unknown"}\n`; prompt += `**Base**: ${pr.base?.label} ← **Head**: ${pr.head?.label}\n`; prompt += `**Body**:\n${pr.body || "(no body)"}\n\n`; } if (event.comment) { const comment = event.comment; const targetNumber = event.issue?.number || event.pull_request?.number; prompt += `**Comment on #${targetNumber}**\n`; prompt += `**Author**: @${comment.user?.login || "unknown"}\n`; prompt += `**Body**:\n${comment.body || "(no body)"}\n\n`; } if (event.pusher) { prompt += `**Pusher**: @${event.pusher.name}\n`; prompt += `**Commits**: ${event.commits?.length || 0}\n\n`; } prompt += `---\n\n`; prompt += `Please analyze this event and decide how to respond. You can:\n`; prompt += `1. Add helpful comments to issues/PRs\n`; prompt += `2. Suggest code fixes or improvements\n`; prompt += `3. Create branches and PRs to fix issues\n`; prompt += `4. Update files directly (if direct_push is enabled)\n`; prompt += `5. Ask for clarification if needed\n\n`; prompt += `Use the available Gitea tools to interact with the repository.`; return prompt; } /** Process the event queue */ async function processQueue() { if (isProcessing || processingQueue.length === 0) return; isProcessing = true; while (processingQueue.length > 0) { const { event } = processingQueue.shift()!; const repoName = event.repository?.full_name || "unknown"; console.log(`[gitea-webhook] Processing event: ${event.action} on ${repoName}`); if (sendMessage) { try { const prompt = formatEventPrompt(event); await sendMessage(prompt); console.log(`[gitea-webhook] Event sent to LLM: ${event.action} on ${repoName}`); } catch (err) { console.error(`[gitea-webhook] Failed to send event to LLM:`, err); } } else { console.warn(`[gitea-webhook] No sendMessage function available, skipping event`); } } isProcessing = false; } /** Start webhook server */ export function startWebhookServer(_pi: ExtensionAPI) { return new Promise(async (resolve, reject) => { try { const http = await import("node:http"); server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => { const url = req.url || ""; res.setHeader("Access-Control-Allow-Origin", "*"); res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); res.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization"); if (req.method === "OPTIONS") { res.writeHead(200); res.end(); return; } // GET /health if (url === "/health" && req.method === "GET") { res.writeHead(200, { "Content-Type": "application/json" }); res.end( JSON.stringify({ status: "ok", uptime: process.uptime(), tracked_repos: trackedRepos.size, queue_depth: processingQueue.length, is_processing: isProcessing, }), ); return; } // POST /hooks/gitea if (url === "/hooks/gitea" && req.method === "POST") { // Validate token if (!validateToken(req)) { res.writeHead(401, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Unauthorized" })); console.error("[gitea-webhook] Token validation failed"); return; } // Read body let body = ""; for await (const chunk of req) { body += chunk.toString(); } // Parse event let event; try { event = JSON.parse(body); } catch { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid JSON" })); return; } // Queue event processingQueue.push({ event, timestamp: Date.now() }); if (processingQueue.length > maxQueueDepth) { const dropped = processingQueue.shift(); if (dropped) { console.warn(`[gitea-webhook] Queue full, dropping oldest event`); } } void processQueue(); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ received: true, event: event.action })); return; } res.writeHead(404, { "Content-Type": "text/plain" }); res.end("Not found"); }); server.listen(WEBHOOK_PORT, WEBHOOK_HOST, () => { console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT}`); resolve(); }); server.on("error", reject); } catch (err) { reject(err); } }); } /** Stop webhook server */ export async function stopWebhookServer() { return new Promise((resolve) => { if (server) { server.close(() => { console.log("[gitea-webhook] Server stopped"); server = null; resolve(); }); } else { resolve(); } }); } /** Poll for new repos and register webhooks */ export function startPolling(_pi: ExtensionAPI) { void fetchUserRepos(); pollTimer = setInterval(() => void fetchUserRepos(), POLL_INTERVAL * 1000); console.log(`[gitea-polling] Polling started (interval: ${POLL_INTERVAL}s)`); } export function stopPolling() { if (pollTimer) { clearInterval(pollTimer); pollTimer = null; } } async function fetchUserRepos() { try { const client = new GiteaClient(); if (!WEBHOOK_URL) { console.warn("[gitea-polling] PI_WEBHOOK_URL not set, skipping webhook registration"); return; } const repos = await client.get("/user/repos?limit=100"); const newRepos: any[] = []; for (const repo of repos) { if (!trackedRepos.has(repo.full_name)) { newRepos.push(repo); } } if (newRepos.length > 0) { console.log(`[gitea-polling] Found ${newRepos.length} new repos, registering webhooks...`); for (const repo of newRepos) { try { const webhook = await client.post(`/repos/${repo.full_name}/hooks`, { type: "gitea", config: { url: `${WEBHOOK_URL}/hooks/gitea`, content_type: "json", ...(WEBHOOK_TOKEN ? { authorization: `Bearer ${WEBHOOK_TOKEN}` } : {}), }, events: ["issues", "issue_comment", "pull_request", "push"], active: true, }); trackedRepos.set(repo.full_name, { webhookId: webhook.id, addedAt: Date.now() }); console.log(`[gitea-polling] Webhook created for ${repo.full_name} (ID: ${webhook.id})`); } catch (err) { console.error(`[gitea-polling] Error creating webhook for ${repo.full_name}:`, err); } } } console.log(`[gitea-polling] Current repos: ${trackedRepos.size}`); } catch (err) { console.error("[gitea-polling] Error fetching user repos:", err); } } export function getTrackedRepos() { return new Map(trackedRepos); }