Files
gitea/pi-extension/webhook/server.ts
pi-bot-01 0389ae0b88 refactor: drop old env var fallbacks, GITEA_ prefix only
Removed PI_WEBHOOK_*, PI_BOT_POLL_INTERVAL, PI_NOTIF_POLL_INTERVAL,
PI_GIT_USER, OPENCLAW_* fallbacks. Only GITEA_* vars now.
GITEA_TOKEN still falls back to PI_GIT_TOKEN in client.ts for CLI compat.
2026-03-13 18:28:28 -07:00

699 lines
23 KiB
TypeScript

/**
* Webhook server + notification poller
*
* Two event sources:
* 1. Webhooks (own repos with admin) — real-time, all events
* 2. Notification polling (collab repos) — @mention/assign driven
*
* @mention routing:
* - Own repos: respond to all events (configurable to mention-only)
* - Collab repos: respond only when @mentioned or assigned
* - Extracts directive text after @botname for focused LLM context
*
* Auth: Bearer token for inbound webhooks, token auth for Gitea API.
*/
import type { Server, IncomingMessage, ServerResponse } from "node:http";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { GiteaClient, GiteaError } from "../../src/client.js";
// ── Config ───────────────────────────────────────────────────────────────────
const WEBHOOK_HOST = process.env.GITEA_WEBHOOK_HOST ?? "0.0.0.0";
const WEBHOOK_PORT = parseInt(process.env.GITEA_WEBHOOK_PORT ?? "3000", 10);
const WEBHOOK_TOKEN = process.env.GITEA_WEBHOOK_TOKEN ?? "";
const WEBHOOK_URL = process.env.GITEA_WEBHOOK_URL ?? "";
const POLL_INTERVAL = parseInt(process.env.GITEA_POLL_INTERVAL ?? "300", 10);
const NOTIF_POLL_INTERVAL = parseInt(process.env.GITEA_NOTIF_INTERVAL ?? "30", 10);
const BOT_USER = process.env.GITEA_USER ?? "";
const STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json";
// ── State ────────────────────────────────────────────────────────────────────
let server: Server | null = null;
let processingQueue: Array<{ prompt: string; timestamp: number }> = [];
const maxQueueDepth = 50;
let isProcessing = false;
let repoPollTimer: NodeJS.Timeout | null = null;
let notifPollTimer: NodeJS.Timeout | null = null;
let sendMessage: ((message: string) => Promise<void>) | null = null;
/** Repos where we successfully installed a webhook */
let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
/** Repos where webhook install failed (403) — notification-polled */
let collabRepos: Set<string> = new Set();
/** All known repos (avoid re-attempting webhook install) */
let knownRepos: Set<string> = new Set();
/** Per-repo config */
interface RepoConfig {
/** "all" = respond to everything, "mention" = only when @mentioned/assigned */
respondTo: "all" | "mention";
}
let repoConfigs: Map<string, RepoConfig> = new Map();
/** Last notification ID we've processed — NOT USED, Gitea reuses thread IDs */
// Tracking is done by marking notifications as read after processing.
// ── Persistence ──────────────────────────────────────────────────────────────
interface PollState {
repoConfigs: Record<string, RepoConfig>;
}
async function loadState(): Promise<PollState> {
try {
const fs = await import("node:fs/promises");
const data = await fs.readFile(STATE_FILE, "utf-8");
const parsed = JSON.parse(data);
return {
repoConfigs: parsed.repoConfigs ?? {},
};
} catch {
return { repoConfigs: {} };
}
}
async function saveState(): Promise<void> {
try {
const fs = await import("node:fs/promises");
const state: PollState = {
repoConfigs: Object.fromEntries(repoConfigs),
};
await fs.writeFile(STATE_FILE, JSON.stringify(state, null, 2), "utf-8");
} catch (err) {
console.error("[gitea-state] Error saving:", err instanceof Error ? err.message : err);
}
}
// ── @Mention Parsing ─────────────────────────────────────────────────────────
interface MentionInfo {
/** Whether this bot was @mentioned */
mentioned: boolean;
/** All @usernames found in the text */
allMentions: string[];
/** Text directed at this bot (after @botname, up to next @mention or end) */
directive: string | null;
/** Full original text */
fullText: string;
}
/**
* Parse @mentions from text.
* Extracts directive = text after @botname up to the next @mention or paragraph.
*/
function parseMentions(text: string, botUser: string): MentionInfo {
if (!text) return { mentioned: false, allMentions: [], directive: null, fullText: text };
// Find all @mentions (word boundary, not inside URLs/emails)
const mentionPattern = /(?:^|[\s(])@([a-zA-Z0-9_-]+)/g;
const allMentions: string[] = [];
let match;
while ((match = mentionPattern.exec(text)) !== null) {
allMentions.push(match[1]);
}
const mentioned = allMentions.some(
(m) => m.toLowerCase() === botUser.toLowerCase(),
);
let directive: string | null = null;
if (mentioned) {
// Extract text after @botname
const botPattern = new RegExp(
`@${escapeRegex(botUser)}\\s*([\\s\\S]*?)(?=@[a-zA-Z0-9_-]+|$)`,
"i",
);
const directiveMatch = text.match(botPattern);
if (directiveMatch) {
directive = directiveMatch[1].trim() || null;
}
}
return { mentioned, allMentions, directive, fullText: text };
}
function escapeRegex(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
// ── Prompt Formatting ────────────────────────────────────────────────────────
interface EventContext {
repo: string;
type: "issue" | "pull_request" | "issue_comment" | "push";
action: string;
number?: number;
title?: string;
author?: string;
body?: string;
labels?: string[];
/** PR-specific */
baseBranch?: string;
headBranch?: string;
/** Comment-specific: the issue/PR number being commented on */
parentNumber?: number;
/** Parsed mention info */
mention?: MentionInfo;
/** Source: "webhook" or "notification" */
source: "webhook" | "notification";
}
function formatPrompt(ctx: EventContext): string {
const { repo, type, action, mention, source } = ctx;
const via = source === "notification" ? " (via @mention)" : "";
let prompt = `New Gitea event on ${repo}${via}:\n\n`;
prompt += `**Action**: ${action}\n\n`;
if (type === "issue" || type === "pull_request") {
const kind = type === "issue" ? "Issue" : "PR";
prompt += `**${kind} #${ctx.number}: ${ctx.title}**\n`;
prompt += `**Author**: @${ctx.author || "unknown"}\n`;
if (ctx.labels?.length) prompt += `**Labels**: ${ctx.labels.join(", ")}\n`;
if (ctx.baseBranch) prompt += `**Base**: ${ctx.baseBranch} ← **Head**: ${ctx.headBranch}\n`;
prompt += `**Body**:\n${ctx.body || "(no body)"}\n\n`;
}
if (type === "issue_comment") {
prompt += `**Comment on #${ctx.parentNumber}**\n`;
prompt += `**Author**: @${ctx.author || "unknown"}\n`;
prompt += `**Body**:\n${ctx.body || "(no body)"}\n\n`;
}
if (type === "push") {
prompt += `**Pusher**: @${ctx.author}\n\n`;
}
// Add mention context
if (mention?.mentioned && mention.directive) {
prompt += `---\n\n`;
prompt += `**You were @mentioned.** The request directed at you:\n`;
prompt += `> ${mention.directive}\n\n`;
} else if (mention?.mentioned) {
prompt += `---\n\n`;
prompt += `**You were @mentioned** in this ${type === "issue_comment" ? "comment" : type}.\n\n`;
}
if (mention?.allMentions.length && mention.allMentions.length > 1) {
const others = mention.allMentions.filter(
(m) => m.toLowerCase() !== BOT_USER.toLowerCase(),
);
if (others.length > 0) {
prompt += `Other users mentioned: ${others.map((m) => "@" + m).join(", ")}\n\n`;
}
}
prompt += `---\n\n`;
prompt += `Please analyze this event and decide how to respond. You can:\n`;
prompt += `1. Add helpful comments to issues/PRs\n`;
prompt += `2. Suggest code fixes or improvements\n`;
prompt += `3. Create branches and PRs to fix issues\n`;
prompt += `4. Update files directly\n`;
prompt += `5. Close issues when work is done\n`;
prompt += `6. Ask for clarification if needed\n\n`;
prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt;
}
// ── Event Queue ──────────────────────────────────────────────────────────────
export function setSendMessage(fn: (message: string) => Promise<void>) {
sendMessage = fn;
}
function enqueuePrompt(prompt: string) {
processingQueue.push({ prompt, timestamp: Date.now() });
if (processingQueue.length > maxQueueDepth) {
processingQueue.shift();
console.warn(`[gitea-events] Queue full, dropped oldest event`);
}
void processQueue();
}
async function processQueue() {
if (isProcessing || processingQueue.length === 0) return;
isProcessing = true;
while (processingQueue.length > 0) {
const { prompt } = processingQueue.shift()!;
if (sendMessage) {
try {
await sendMessage(prompt);
console.log(`[gitea-events] Event delivered to LLM`);
} catch (err) {
console.error(`[gitea-events] Failed to send to LLM:`, err instanceof Error ? err.message : err);
}
} else {
console.warn(`[gitea-events] No sendMessage function, skipping`);
}
}
isProcessing = false;
}
// ── Mention Filter ───────────────────────────────────────────────────────────
/**
* Should we process this event?
* - Own repos (webhook): always, unless config says "mention"
* - Collab repos: only if @mentioned
* - Always skip events from the bot itself
*/
function shouldProcess(repo: string, author: string, body: string, source: "webhook" | "notification"): { process: boolean; mention: MentionInfo } {
const mention = parseMentions(body, BOT_USER);
// Never process our own events
if (author.toLowerCase() === BOT_USER.toLowerCase()) {
return { process: false, mention };
}
// Notification source = already @mention filtered by Gitea
if (source === "notification") {
return { process: true, mention };
}
// Webhook source — check repo config
const config = repoConfigs.get(repo) ?? { respondTo: "all" };
if (config.respondTo === "mention") {
return { process: mention.mentioned, mention };
}
// Default for webhook repos: respond to all
return { process: true, mention };
}
// ── HTTP Webhook Server ──────────────────────────────────────────────────────
function validateToken(req: IncomingMessage): boolean {
if (!WEBHOOK_TOKEN) return true;
const auth = req.headers["authorization"];
if (!auth) return false;
return auth === `Bearer ${WEBHOOK_TOKEN}`;
}
export function startWebhookServer(_pi: ExtensionAPI) {
return new Promise<void>(async (resolve, reject) => {
try {
const http = await import("node:http");
server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
const url = req.url || "";
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
res.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization");
if (req.method === "OPTIONS") {
res.writeHead(200);
res.end();
return;
}
// GET /health
if (url === "/health" && req.method === "GET") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({
status: "ok",
uptime: process.uptime(),
bot_user: BOT_USER,
webhook_repos: webhookRepos.size,
collab_repos: collabRepos.size,
queue_depth: processingQueue.length,
is_processing: isProcessing,
}));
return;
}
// POST /hooks/gitea
if (url === "/hooks/gitea" && req.method === "POST") {
if (!validateToken(req)) {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
return;
}
let rawBody = "";
for await (const chunk of req) rawBody += chunk.toString();
let event: any;
try {
event = JSON.parse(rawBody);
} catch {
res.writeHead(400, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Invalid JSON" }));
return;
}
handleWebhookEvent(event);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ received: true }));
return;
}
res.writeHead(404, { "Content-Type": "text/plain" });
res.end("Not found");
});
server.listen(WEBHOOK_PORT, WEBHOOK_HOST, () => {
console.log(`[gitea-webhook] Server listening on ${WEBHOOK_HOST}:${WEBHOOK_PORT}`);
resolve();
});
server.on("error", reject);
} catch (err) {
reject(err);
}
});
}
/** Process an inbound webhook event (from owned repos) */
function handleWebhookEvent(event: any) {
const repo = event.repository?.full_name || "unknown";
// Determine event type and extract body/author
let body = "";
let author = "";
let eventCtx: EventContext | null = null;
if (event.issue && !event.comment) {
// Issue opened/edited
body = event.issue.body || "";
author = event.issue.user?.login || "";
eventCtx = {
repo, type: "issue", action: event.action,
number: event.issue.number, title: event.issue.title,
author, body, labels: event.issue.labels?.map((l: any) => l.name),
source: "webhook",
};
} else if (event.comment) {
// Issue/PR comment
body = event.comment.body || "";
author = event.comment.user?.login || "";
const parentNumber = event.issue?.number || event.pull_request?.number;
eventCtx = {
repo, type: "issue_comment", action: event.action,
author, body, parentNumber,
source: "webhook",
};
} else if (event.pull_request && !event.comment) {
// PR opened/edited
body = event.pull_request.body || "";
author = event.pull_request.user?.login || "";
eventCtx = {
repo, type: "pull_request", action: event.action,
number: event.pull_request.number, title: event.pull_request.title,
author, body,
baseBranch: event.pull_request.base?.label,
headBranch: event.pull_request.head?.label,
source: "webhook",
};
} else if (event.pusher) {
// Push event — no @mention filtering
author = event.pusher?.name || "";
if (author.toLowerCase() === BOT_USER.toLowerCase()) return;
eventCtx = {
repo, type: "push", action: "push",
author, body: "",
source: "webhook",
};
}
if (!eventCtx) return;
const { process: shouldDo, mention } = shouldProcess(repo, author, body, "webhook");
if (!shouldDo) {
console.log(`[gitea-webhook] Skipped event on ${repo} (not mentioned, respondTo=mention)`);
return;
}
eventCtx.mention = mention;
const prompt = formatPrompt(eventCtx);
console.log(`[gitea-webhook] Event: ${eventCtx.type}/${eventCtx.action} on ${repo} by @${author}${mention.mentioned ? " (mentioned)" : ""}`);
enqueuePrompt(prompt);
}
export async function stopWebhookServer() {
return new Promise<void>((resolve) => {
if (server) {
server.close(() => {
console.log("[gitea-webhook] Server stopped");
server = null;
resolve();
});
} else {
resolve();
}
});
}
// ── Repo Discovery ───────────────────────────────────────────────────────────
async function discoverRepos() {
try {
const client = new GiteaClient();
const repos = await client.get<any[]>("/user/repos?limit=100");
const authenticatedUser = BOT_USER.toLowerCase();
let newWebhooks = 0;
let newCollab = 0;
for (const repo of repos) {
const name = repo.full_name;
if (knownRepos.has(name)) continue;
const isOwner = repo.owner?.login?.toLowerCase() === authenticatedUser;
if (WEBHOOK_URL && isOwner) {
// Own repo — try to install webhook
try {
const webhook = await client.post<any>(`/repos/${name}/hooks`, {
type: "gitea",
config: {
url: `${WEBHOOK_URL}/hooks/gitea`,
content_type: "json",
...(WEBHOOK_TOKEN ? { authorization: `Bearer ${WEBHOOK_TOKEN}` } : {}),
},
events: ["issues", "issue_comment", "pull_request", "push"],
active: true,
});
webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() });
knownRepos.add(name);
// Own repos default to "all"
if (!repoConfigs.has(name)) repoConfigs.set(name, { respondTo: "all" });
newWebhooks++;
console.log(`[gitea-repos] ✅ Webhook: ${name} (ID: ${webhook.id})`);
continue;
} catch (err) {
if (!(err instanceof GiteaError && err.status === 403)) {
console.error(`[gitea-repos] ❌ Webhook error: ${name}: ${err instanceof Error ? err.message : err}`);
}
// Fall through to collab handling
}
}
// Collab repo (or own repo without webhook URL) — notification-polled
collabRepos.add(name);
knownRepos.add(name);
// Collab repos default to "mention" only
if (!repoConfigs.has(name)) repoConfigs.set(name, { respondTo: "mention" });
newCollab++;
console.log(`[gitea-repos] 📋 Collab (mention-only): ${name}`);
}
if (newWebhooks > 0 || newCollab > 0) {
console.log(`[gitea-repos] Discovered: ${newWebhooks} webhook, ${newCollab} collab`);
await saveState();
}
console.log(`[gitea-repos] Total: ${webhookRepos.size} webhook + ${collabRepos.size} collab repos`);
} catch (err) {
console.error("[gitea-repos] Error discovering:", err instanceof Error ? err.message : err);
}
}
// ── Notification Polling ─────────────────────────────────────────────────────
/**
* Poll Gitea notifications API for @mentions and assignments.
* Much more efficient than scanning every repo — single API call.
* Gitea handles the @mention detection for us.
*/
async function pollNotifications() {
try {
const client = new GiteaClient();
// Fetch unread notifications
const notifications = await client.get<any[]>(
"/notifications?status-types=unread&limit=20",
);
if (notifications.length === 0) return;
console.log(`[gitea-notif] ${notifications.length} unread notifications`);
for (const notif of notifications) {
const notifId = notif.id;
const repo = notif.repository?.full_name;
const subjectType = notif.subject?.type; // "Issue", "Pull", "Commit"
const subjectUrl = notif.subject?.url; // API URL for the issue/PR
const latestCommentUrl = notif.subject?.latest_comment_url;
if (!repo || !subjectUrl) {
await markNotifRead(client, notifId);
continue;
}
try {
// Determine what triggered the notification:
// - If latest_comment_url exists and differs from subject_url, it's a comment
// - Otherwise it's the issue/PR itself
let eventCtx: EventContext | null = null;
if (latestCommentUrl && latestCommentUrl !== subjectUrl) {
// Notification triggered by a comment
const commentPath = latestCommentUrl.replace(/.*\/api\/v1/, "");
console.log(`[gitea-notif] Fetching comment: ${commentPath}`);
const comment = await client.get<any>(commentPath);
const body = comment.body || "";
const author = comment.user?.login || "";
// Skip our own comments
if (author.toLowerCase() === BOT_USER.toLowerCase()) {
await markNotifRead(client, notifId);
continue;
}
const mention = parseMentions(body, BOT_USER);
// Extract issue/PR number from subject URL
const numberMatch = subjectUrl.match(/\/(\d+)$/);
const parentNumber = numberMatch ? parseInt(numberMatch[1], 10) : undefined;
eventCtx = {
repo, type: "issue_comment", action: "created",
author, body, parentNumber, mention,
source: "notification",
};
} else {
// Notification triggered by the issue/PR itself
const subjectPath = subjectUrl.replace(/.*\/api\/v1/, "");
console.log(`[gitea-notif] Fetching subject: ${subjectPath} (type: ${subjectType})`);
const subject = await client.get<any>(subjectPath);
const body = subject.body || "";
const author = subject.user?.login || "";
if (author.toLowerCase() === BOT_USER.toLowerCase()) {
await markNotifRead(client, notifId);
continue;
}
const mention = parseMentions(body, BOT_USER);
if (subjectType === "Issue") {
eventCtx = {
repo, type: "issue", action: "opened",
number: subject.number, title: subject.title,
author, body, labels: subject.labels?.map((l: any) => l.name),
mention, source: "notification",
};
} else if (subjectType === "Pull") {
eventCtx = {
repo, type: "pull_request", action: "opened",
number: subject.number, title: subject.title,
author, body,
baseBranch: subject.base?.label,
headBranch: subject.head?.label,
mention, source: "notification",
};
}
}
if (eventCtx) {
const mentionTag = eventCtx.mention?.mentioned ? " 🔔" : "";
console.log(`[gitea-notif] ${eventCtx.type} on ${repo}#${eventCtx.number ?? eventCtx.parentNumber} by @${eventCtx.author}${mentionTag}`);
const prompt = formatPrompt(eventCtx);
enqueuePrompt(prompt);
}
} catch (err) {
console.error(`[gitea-notif] Error processing notification ${notifId}:`, err instanceof Error ? err.message : err);
}
// Mark as read
await markNotifRead(client, notifId);
}
} catch (err) {
console.error("[gitea-notif] Error polling:", err instanceof Error ? err.message : err);
}
}
async function markNotifRead(client: GiteaClient, notifId: number) {
try {
await client.patch(`/notifications/threads/${notifId}`, {});
} catch {
// Best effort — don't fail the whole poll
}
}
// ── Public API ───────────────────────────────────────────────────────────────
export async function startPolling(_pi: ExtensionAPI) {
// Load persisted state
const saved = await loadState();
for (const [repo, config] of Object.entries(saved.repoConfigs)) {
repoConfigs.set(repo, config);
}
// Discover repos immediately, then on interval
void discoverRepos();
repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`);
// Notification polling on a fast interval
notifPollTimer = setInterval(() => void pollNotifications(), NOTIF_POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Notification polling started (interval: ${NOTIF_POLL_INTERVAL}s)`);
}
export function stopPolling() {
if (repoPollTimer) {
clearInterval(repoPollTimer);
repoPollTimer = null;
}
if (notifPollTimer) {
clearInterval(notifPollTimer);
notifPollTimer = null;
}
webhookRepos.clear();
collabRepos.clear();
knownRepos.clear();
}
export function getTrackedRepos() {
return {
webhook: new Map(webhookRepos),
collab: new Set(collabRepos),
configs: new Map(repoConfigs),
};
}
/** Set respondTo mode for a repo. Returns the new config. */
export function setRepoConfig(repo: string, config: Partial<RepoConfig>): RepoConfig {
const current = repoConfigs.get(repo) ?? { respondTo: "mention" };
const updated = { ...current, ...config };
repoConfigs.set(repo, updated);
void saveState();
return updated;
}
export { parseMentions, type MentionInfo, type RepoConfig };