Files
gitea/pi-extension/webhook/server.ts
pi-bot-01 b868ad4df5 fix: POST for file creation (Gitea 1.25+), add edit_issue tool, persist poll state
- files.ts: Use POST for new files, PUT for updates (Gitea 1.25 requires this)
- issues.ts: Add editIssue() for state/title/body changes
- write-tools.ts: Add gitea_edit_issue tool (open/close/edit issues)
- webhook/server.ts: Persist lastPollAt to disk to prevent duplicate
  events on reload; use followUp delivery to queue events during LLM turns
- index.ts: Use deliverAs:'followUp' for sendUserMessage
2026-03-13 17:14:06 -07:00

501 lines
17 KiB
TypeScript

/**
* Webhook server — receives Gitea events via HTTP + polls collab repos
*
* Auth: Bearer token validation (PI_WEBHOOK_TOKEN).
* No HMAC/secret — consistent with token-based auth strategy.
*
* Repos where the bot has admin access get webhooks installed automatically.
* Repos where the bot is a non-admin collaborator are polled for new events.
*/
import type { Server, IncomingMessage, ServerResponse } from "node:http";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { GiteaClient, GiteaError } from "../../src/client.js";
const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0";
const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10);
const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? "";
const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? "";
const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10);
const EVENT_POLL_INTERVAL = parseInt(process.env.PI_EVENT_POLL_INTERVAL ?? "60", 10);
const BOT_USER = process.env.PI_GIT_USER ?? "";
const POLL_STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json";
let server: Server | null = null;
let processingQueue: Array<{ event: any; timestamp: number }> = [];
let maxQueueDepth = 50;
let isProcessing = false;
let repoPollTimer: NodeJS.Timeout | null = null;
let eventPollTimer: NodeJS.Timeout | null = null;
let sendMessage: ((message: string) => Promise<void>) | null = null;
/** Repos where we successfully installed a webhook */
let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
/** Repos where webhook install failed (403) — we poll these for events */
let pollOnlyRepos: Map<string, { addedAt: number; lastPollAt: string }> = new Map();
/** Track all known repos so we don't re-attempt webhook install every cycle */
let knownRepos: Set<string> = new Set();
/** Load poll timestamps from disk (survives reloads) */
async function loadPollState(): Promise<Record<string, string>> {
try {
const fs = await import("node:fs/promises");
const data = await fs.readFile(POLL_STATE_FILE, "utf-8");
return JSON.parse(data);
} catch {
return {};
}
}
/** Save poll timestamps to disk */
async function savePollState(): Promise<void> {
try {
const fs = await import("node:fs/promises");
const state: Record<string, string> = {};
for (const [name, s] of pollOnlyRepos) {
state[name] = s.lastPollAt;
}
await fs.writeFile(POLL_STATE_FILE, JSON.stringify(state), "utf-8");
} catch (err) {
console.error("[gitea-polling] Error saving poll state:", err instanceof Error ? err.message : err);
}
}
export function setSendMessage(fn: (message: string) => Promise<void>) {
sendMessage = fn;
}
/** Validate bearer token on incoming webhook request */
function validateToken(req: IncomingMessage): boolean {
if (!WEBHOOK_TOKEN) return true; // No token configured = open (localhost only)
const auth = req.headers["authorization"];
if (!auth) return false;
return auth === `Bearer ${WEBHOOK_TOKEN}`;
}
/** Format a Gitea event as a prompt for the LLM */
function formatEventPrompt(event: any): string {
const action = event.action;
const repo = event.repository?.full_name || "unknown";
let prompt = `New Gitea event on ${repo}:\n\n`;
prompt += `**Action**: ${action}\n\n`;
if (event.issue) {
const issue = event.issue;
prompt += `**Issue #${issue.number}: ${issue.title}**\n`;
prompt += `**Author**: @${issue.user?.login || "unknown"}\n`;
prompt += `**Labels**: ${issue.labels?.map((l: any) => l.name).join(", ") || "none"}\n`;
prompt += `**Body**:\n${issue.body || "(no body)"}\n\n`;
}
if (event.pull_request) {
const pr = event.pull_request;
prompt += `**PR #${pr.number}: ${pr.title}**\n`;
prompt += `**Author**: @${pr.user?.login || "unknown"}\n`;
prompt += `**Base**: ${pr.base?.label} ← **Head**: ${pr.head?.label}\n`;
prompt += `**Body**:\n${pr.body || "(no body)"}\n\n`;
}
if (event.comment) {
const comment = event.comment;
const targetNumber = event.issue?.number || event.pull_request?.number;
prompt += `**Comment on #${targetNumber}**\n`;
prompt += `**Author**: @${comment.user?.login || "unknown"}\n`;
prompt += `**Body**:\n${comment.body || "(no body)"}\n\n`;
}
if (event.pusher) {
prompt += `**Pusher**: @${event.pusher.name}\n`;
prompt += `**Commits**: ${event.commits?.length || 0}\n\n`;
}
prompt += `---\n\n`;
prompt += `Please analyze this event and decide how to respond. You can:\n`;
prompt += `1. Add helpful comments to issues/PRs\n`;
prompt += `2. Suggest code fixes or improvements\n`;
prompt += `3. Create branches and PRs to fix issues\n`;
prompt += `4. Update files directly (if direct_push is enabled)\n`;
prompt += `5. Ask for clarification if needed\n\n`;
prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt;
}
/** Format a polled issue/PR/comment as a synthetic event prompt */
function formatPolledEvent(type: "issue" | "pull_request" | "issue_comment", item: any, repo: string): string {
let prompt = `New Gitea event on ${repo} (via polling):\n\n`;
prompt += `**Action**: opened\n\n`;
if (type === "issue") {
prompt += `**Issue #${item.number}: ${item.title}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Labels**: ${item.labels?.map((l: any) => l.name).join(", ") || "none"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "pull_request") {
prompt += `**PR #${item.number}: ${item.title}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Base**: ${item.base?.label} ← **Head**: ${item.head?.label}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "issue_comment") {
prompt += `**Comment on #${item._issueNumber}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
}
prompt += `---\n\n`;
prompt += `Please analyze this event and decide how to respond.\n`;
prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt;
}
/** Enqueue an event for LLM processing */
function enqueueEvent(event: any) {
processingQueue.push({ event, timestamp: Date.now() });
if (processingQueue.length > maxQueueDepth) {
processingQueue.shift();
console.warn(`[gitea-webhook] Queue full, dropped oldest event`);
}
void processQueue();
}
/** Process the event queue */
async function processQueue() {
if (isProcessing || processingQueue.length === 0) return;
isProcessing = true;
while (processingQueue.length > 0) {
const { event } = processingQueue.shift()!;
const repoName = event.repository?.full_name || event._repo || "unknown";
console.log(`[gitea-webhook] Processing event: ${event.action || event._type} on ${repoName}`);
if (sendMessage) {
try {
const prompt = event._polled
? formatPolledEvent(event._type, event._item, event._repo)
: formatEventPrompt(event);
await sendMessage(prompt);
console.log(`[gitea-webhook] Event sent to LLM: ${event.action || event._type} on ${repoName}`);
} catch (err) {
console.error(`[gitea-webhook] Failed to send event to LLM:`, err);
}
} else {
console.warn(`[gitea-webhook] No sendMessage function available, skipping event`);
}
}
isProcessing = false;
}
// ── HTTP Server ──────────────────────────────────────────────────────────────
/** Start webhook server */
export function startWebhookServer(_pi: ExtensionAPI) {
return new Promise<void>(async (resolve, reject) => {
try {
const http = await import("node:http");
server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
const url = req.url || "";
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
res.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization");
if (req.method === "OPTIONS") {
res.writeHead(200);
res.end();
return;
}
// GET /health
if (url === "/health" && req.method === "GET") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(
JSON.stringify({
status: "ok",
uptime: process.uptime(),
webhook_repos: webhookRepos.size,
poll_only_repos: pollOnlyRepos.size,
queue_depth: processingQueue.length,
is_processing: isProcessing,
}),
);
return;
}
// POST /hooks/gitea
if (url === "/hooks/gitea" && req.method === "POST") {
if (!validateToken(req)) {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
console.error("[gitea-webhook] Token validation failed");
return;
}
let body = "";
for await (const chunk of req) {
body += chunk.toString();
}
let event;
try {
event = JSON.parse(body);
} catch {
res.writeHead(400, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Invalid JSON" }));
return;
}
enqueueEvent(event);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ received: true, event: event.action }));
return;
}
res.writeHead(404, { "Content-Type": "text/plain" });
res.end("Not found");
});
server.listen(WEBHOOK_PORT, WEBHOOK_HOST, () => {
console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT}`);
resolve();
});
server.on("error", reject);
} catch (err) {
reject(err);
}
});
}
/** Stop webhook server */
export async function stopWebhookServer() {
return new Promise<void>((resolve) => {
if (server) {
server.close(() => {
console.log("[gitea-webhook] Server stopped");
server = null;
resolve();
});
} else {
resolve();
}
});
}
// ── Repo Discovery & Webhook Registration ────────────────────────────────────
/** Discover repos and install webhooks (or mark as poll-only on 403) */
async function discoverRepos() {
try {
const client = new GiteaClient();
const repos = await client.get<any[]>("/user/repos?limit=100");
const savedState = await loadPollState();
let newWebhooks = 0;
let newPollOnly = 0;
for (const repo of repos) {
const name = repo.full_name;
if (knownRepos.has(name)) continue;
// Try to install webhook
if (WEBHOOK_URL) {
try {
const webhook = await client.post<any>(`/repos/${name}/hooks`, {
type: "gitea",
config: {
url: `${WEBHOOK_URL}/hooks/gitea`,
content_type: "json",
...(WEBHOOK_TOKEN ? { authorization: `Bearer ${WEBHOOK_TOKEN}` } : {}),
},
events: ["issues", "issue_comment", "pull_request", "push"],
active: true,
});
webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() });
knownRepos.add(name);
newWebhooks++;
console.log(`[gitea-polling] ✅ Webhook installed: ${name} (ID: ${webhook.id})`);
continue;
} catch (err) {
if (err instanceof GiteaError && err.status === 403) {
// No admin access — fall back to polling
// Use persisted timestamp if available, otherwise 5 min ago
const lastPoll = savedState[name] ?? new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: lastPoll,
});
knownRepos.add(name);
newPollOnly++;
console.log(`[gitea-polling] 📋 Poll-only (no admin): ${name}`);
continue;
}
// Other error — log but don't add to known (retry next cycle)
console.error(`[gitea-polling] ❌ Webhook error for ${name}: ${err instanceof Error ? err.message : err}`);
}
} else {
// No webhook URL configured — all repos are poll-only
const lastPoll = savedState[name] ?? new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: lastPoll,
});
knownRepos.add(name);
newPollOnly++;
}
}
if (newWebhooks > 0 || newPollOnly > 0) {
console.log(`[gitea-polling] Discovered: ${newWebhooks} webhook, ${newPollOnly} poll-only`);
}
console.log(`[gitea-polling] Total: ${webhookRepos.size} webhook + ${pollOnlyRepos.size} poll-only repos`);
} catch (err) {
console.error("[gitea-polling] Error discovering repos:", err instanceof Error ? err.message : err);
}
}
// ── Event Polling for Non-Admin Repos ────────────────────────────────────────
/** Poll all poll-only repos for new issues, PRs, and comments */
async function pollForEvents() {
if (pollOnlyRepos.size === 0) {
console.log(`[gitea-poll-events] No poll-only repos to check`);
return;
}
console.log(`[gitea-poll-events] Checking ${pollOnlyRepos.size} repos for new events...`);
const client = new GiteaClient();
for (const [repoName, state] of pollOnlyRepos) {
try {
const since = state.lastPollAt;
const sinceDate = new Date(since);
const now = new Date().toISOString();
const [owner, repo] = repoName.split("/");
// Fetch recent issues created/updated since last poll
const issues = await client.get<any[]>(
`/repos/${owner}/${repo}/issues?state=open&sort=created&type=issues&since=${since}&limit=20`
);
for (const issue of issues) {
// Skip issues created by the bot itself
if (issue.user?.login === BOT_USER) continue;
// Only process issues created after our last poll (not just updated)
if (new Date(issue.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New issue: ${repoName}#${issue.number} "${issue.title}"`);
enqueueEvent({
_polled: true,
_type: "issue",
_item: issue,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
// Check for new comments on this issue
try {
const comments = await client.get<any[]>(
`/repos/${owner}/${repo}/issues/${issue.number}/comments?since=${since}`
);
for (const comment of comments) {
if (comment.user?.login === BOT_USER) continue;
if (new Date(comment.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New comment on ${repoName}#${issue.number} by @${comment.user?.login}`);
enqueueEvent({
_polled: true,
_type: "issue_comment",
_item: { ...comment, _issueNumber: issue.number },
_repo: repoName,
action: "created",
repository: { full_name: repoName },
});
}
}
} catch {
// Comment fetch failed, skip
}
}
// Fetch recent PRs
const prs = await client.get<any[]>(
`/repos/${owner}/${repo}/pulls?state=open&sort=created&limit=20`
);
for (const pr of prs) {
if (pr.user?.login === BOT_USER) continue;
if (new Date(pr.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New PR: ${repoName}#${pr.number} "${pr.title}"`);
enqueueEvent({
_polled: true,
_type: "pull_request",
_item: pr,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
}
// Update last poll timestamp
state.lastPollAt = now;
} catch (err) {
console.error(`[gitea-poll-events] Error polling ${repoName}: ${err instanceof Error ? err.message : err}`);
}
}
// Persist poll timestamps
await savePollState();
}
// ── Public API ───────────────────────────────────────────────────────────────
/** Start repo discovery polling + event polling */
export function startPolling(_pi: ExtensionAPI) {
// Discover repos immediately, then on interval
void discoverRepos();
repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`);
// Event polling for poll-only repos on a faster interval
eventPollTimer = setInterval(() => void pollForEvents(), EVENT_POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Event polling started (interval: ${EVENT_POLL_INTERVAL}s)`);
}
export function stopPolling() {
if (repoPollTimer) {
clearInterval(repoPollTimer);
repoPollTimer = null;
}
if (eventPollTimer) {
clearInterval(eventPollTimer);
eventPollTimer = null;
}
// Clear state for clean reload
webhookRepos.clear();
pollOnlyRepos.clear();
knownRepos.clear();
}
export function getTrackedRepos() {
return {
webhook: new Map(webhookRepos),
pollOnly: new Map(pollOnlyRepos),
};
}