feat: @mention routing via Gitea notifications API

- Replace per-repo event polling with notification-based polling
- Collab repos default to mention-only mode (respond only when @mentioned)
- Own repos default to all-events mode (respond to everything)
- Parse @mentions and extract directive text for focused LLM context
- Notification poll interval: 30s (configurable via PI_NOTIF_POLL_INTERVAL)
- Mark notifications as read after processing (no ID tracking needed)
- Add gitea_repo_config tool to switch repos between 'all' and 'mention' modes
- Add gitea_tracked_repos tool to show all repos and their response modes
- Persist per-repo configs to disk across reloads
- Multi-bot coordination: issues can @mention different bots with directives
This commit is contained in:
2026-03-13 17:52:47 -07:00
parent b868ad4df5
commit 83a42de9e2
2 changed files with 514 additions and 255 deletions

View File

@@ -2,22 +2,83 @@
* pi-gitea Extension — entry point * pi-gitea Extension — entry point
* *
* Registers Gitea tools (read + write) and optional webhook server. * Registers Gitea tools (read + write) and optional webhook server.
* Supports @mention routing for multi-bot coordination.
*/ */
import registerReadTools from "./tools/read-tools.js"; import registerReadTools from "./tools/read-tools.js";
import registerWriteTools from "./tools/write-tools.js"; import registerWriteTools from "./tools/write-tools.js";
import { startWebhookServer, stopWebhookServer, startPolling, stopPolling, setSendMessage } from "./webhook/server.js"; import {
startWebhookServer, stopWebhookServer,
startPolling, stopPolling,
setSendMessage, getTrackedRepos, setRepoConfig,
} from "./webhook/server.js";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { Type } from "@sinclair/typebox";
export default function (pi: ExtensionAPI) { export default function (pi: ExtensionAPI) {
registerReadTools(pi); registerReadTools(pi);
registerWriteTools(pi); registerWriteTools(pi);
// ── Repo config tool ─────────────────────────────────────────────────────
pi.registerTool({
name: "gitea_repo_config",
label: "Gitea: Configure Repo Response Mode",
description:
'Set how the bot responds to events on a repo. ' +
'"all" = respond to every event. ' +
'"mention" = respond only when @mentioned or assigned. ' +
'Collab repos default to "mention", own repos default to "all".',
parameters: Type.Object({
repo: Type.String({ description: "Repository (owner/name)" }),
respondTo: Type.String({ description: '"all" or "mention"' }),
}),
async execute(_id, params) {
const mode = params.respondTo as "all" | "mention";
if (mode !== "all" && mode !== "mention") {
return {
content: [{ type: "text", text: `Invalid mode "${params.respondTo}". Use "all" or "mention".` }],
};
}
const config = setRepoConfig(params.repo, { respondTo: mode });
return {
content: [{ type: "text", text: `${params.repo}: respondTo = ${config.respondTo}` }],
details: { repo: params.repo, config },
};
},
});
pi.registerTool({
name: "gitea_tracked_repos",
label: "Gitea: List Tracked Repos",
description: "Show all tracked repos, their type (webhook/collab), and response mode.",
parameters: Type.Object({}),
async execute() {
const { webhook, collab, configs } = getTrackedRepos();
const lines: string[] = [];
for (const [name] of webhook) {
const mode = configs.get(name)?.respondTo ?? "all";
lines.push(`${name} — webhook (respondTo: ${mode})`);
}
for (const name of collab) {
const mode = configs.get(name)?.respondTo ?? "mention";
lines.push(`📋 ${name} — collab/notification (respondTo: ${mode})`);
}
return {
content: [{ type: "text", text: lines.length > 0 ? lines.join("\n") : "No repos tracked." }],
details: { webhook: [...webhook.keys()], collab: [...collab], configs: Object.fromEntries(configs) },
};
},
});
// ── Lifecycle ────────────────────────────────────────────────────────────
pi.on("session_start", async (_event, ctx) => { pi.on("session_start", async (_event, ctx) => {
console.log("[pi-gitea] Session started"); console.log("[pi-gitea] Session started");
const sendMessageFn = (msg: string) => { const sendMessageFn = (msg: string) => {
// Use followUp so events queue when the LLM is already processing
ctx.sendUserMessage(msg, { deliverAs: "followUp" }); ctx.sendUserMessage(msg, { deliverAs: "followUp" });
return Promise.resolve(); return Promise.resolve();
}; };
@@ -25,9 +86,9 @@ export default function (pi: ExtensionAPI) {
try { try {
await startWebhookServer(pi); await startWebhookServer(pi);
startPolling(pi); await startPolling(pi);
} catch (err) { } catch (err) {
console.error("[pi-gitea] Failed to start webhook server:", err); console.error("[pi-gitea] Failed to start:", err);
} }
}); });

View File

@@ -1,116 +1,211 @@
/** /**
* Webhook server — receives Gitea events via HTTP + polls collab repos * Webhook server + notification poller
* *
* Auth: Bearer token validation (PI_WEBHOOK_TOKEN). * Two event sources:
* No HMAC/secret — consistent with token-based auth strategy. * 1. Webhooks (own repos with admin) — real-time, all events
* 2. Notification polling (collab repos) — @mention/assign driven
* *
* Repos where the bot has admin access get webhooks installed automatically. * @mention routing:
* Repos where the bot is a non-admin collaborator are polled for new events. * - Own repos: respond to all events (configurable to mention-only)
* - Collab repos: respond only when @mentioned or assigned
* - Extracts directive text after @botname for focused LLM context
*
* Auth: Bearer token for inbound webhooks, token auth for Gitea API.
*/ */
import type { Server, IncomingMessage, ServerResponse } from "node:http"; import type { Server, IncomingMessage, ServerResponse } from "node:http";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { GiteaClient, GiteaError } from "../../src/client.js"; import { GiteaClient, GiteaError } from "../../src/client.js";
// ── Config ───────────────────────────────────────────────────────────────────
const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0"; const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0";
const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10); const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10);
const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? ""; const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? "";
const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? ""; const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? "";
const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10); const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10);
const EVENT_POLL_INTERVAL = parseInt(process.env.PI_EVENT_POLL_INTERVAL ?? "60", 10); const NOTIF_POLL_INTERVAL = parseInt(process.env.PI_NOTIF_POLL_INTERVAL ?? "30", 10);
const BOT_USER = process.env.PI_GIT_USER ?? ""; const BOT_USER = process.env.PI_GIT_USER ?? "";
const POLL_STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json"; const STATE_FILE = "/home/pibot/.pi/agent/gitea-poll-state.json";
// ── State ────────────────────────────────────────────────────────────────────
let server: Server | null = null; let server: Server | null = null;
let processingQueue: Array<{ event: any; timestamp: number }> = []; let processingQueue: Array<{ prompt: string; timestamp: number }> = [];
let maxQueueDepth = 50; const maxQueueDepth = 50;
let isProcessing = false; let isProcessing = false;
let repoPollTimer: NodeJS.Timeout | null = null; let repoPollTimer: NodeJS.Timeout | null = null;
let eventPollTimer: NodeJS.Timeout | null = null; let notifPollTimer: NodeJS.Timeout | null = null;
let sendMessage: ((message: string) => Promise<void>) | null = null; let sendMessage: ((message: string) => Promise<void>) | null = null;
/** Repos where we successfully installed a webhook */ /** Repos where we successfully installed a webhook */
let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map(); let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
/** Repos where webhook install failed (403) — we poll these for events */ /** Repos where webhook install failed (403) — notification-polled */
let pollOnlyRepos: Map<string, { addedAt: number; lastPollAt: string }> = new Map(); let collabRepos: Set<string> = new Set();
/** Track all known repos so we don't re-attempt webhook install every cycle */ /** All known repos (avoid re-attempting webhook install) */
let knownRepos: Set<string> = new Set(); let knownRepos: Set<string> = new Set();
/** Load poll timestamps from disk (survives reloads) */ /** Per-repo config */
async function loadPollState(): Promise<Record<string, string>> { interface RepoConfig {
/** "all" = respond to everything, "mention" = only when @mentioned/assigned */
respondTo: "all" | "mention";
}
let repoConfigs: Map<string, RepoConfig> = new Map();
/** Last notification ID we've processed — NOT USED, Gitea reuses thread IDs */
// Tracking is done by marking notifications as read after processing.
// ── Persistence ──────────────────────────────────────────────────────────────
interface PollState {
repoConfigs: Record<string, RepoConfig>;
}
async function loadState(): Promise<PollState> {
try { try {
const fs = await import("node:fs/promises"); const fs = await import("node:fs/promises");
const data = await fs.readFile(POLL_STATE_FILE, "utf-8"); const data = await fs.readFile(STATE_FILE, "utf-8");
return JSON.parse(data); const parsed = JSON.parse(data);
return {
repoConfigs: parsed.repoConfigs ?? {},
};
} catch { } catch {
return {}; return { repoConfigs: {} };
} }
} }
/** Save poll timestamps to disk */ async function saveState(): Promise<void> {
async function savePollState(): Promise<void> {
try { try {
const fs = await import("node:fs/promises"); const fs = await import("node:fs/promises");
const state: Record<string, string> = {}; const state: PollState = {
for (const [name, s] of pollOnlyRepos) { repoConfigs: Object.fromEntries(repoConfigs),
state[name] = s.lastPollAt; };
} await fs.writeFile(STATE_FILE, JSON.stringify(state, null, 2), "utf-8");
await fs.writeFile(POLL_STATE_FILE, JSON.stringify(state), "utf-8");
} catch (err) { } catch (err) {
console.error("[gitea-polling] Error saving poll state:", err instanceof Error ? err.message : err); console.error("[gitea-state] Error saving:", err instanceof Error ? err.message : err);
} }
} }
export function setSendMessage(fn: (message: string) => Promise<void>) { // ── @Mention Parsing ─────────────────────────────────────────────────────────
sendMessage = fn;
interface MentionInfo {
/** Whether this bot was @mentioned */
mentioned: boolean;
/** All @usernames found in the text */
allMentions: string[];
/** Text directed at this bot (after @botname, up to next @mention or end) */
directive: string | null;
/** Full original text */
fullText: string;
} }
/** Validate bearer token on incoming webhook request */ /**
function validateToken(req: IncomingMessage): boolean { * Parse @mentions from text.
if (!WEBHOOK_TOKEN) return true; // No token configured = open (localhost only) * Extracts directive = text after @botname up to the next @mention or paragraph.
const auth = req.headers["authorization"]; */
if (!auth) return false; function parseMentions(text: string, botUser: string): MentionInfo {
return auth === `Bearer ${WEBHOOK_TOKEN}`; if (!text) return { mentioned: false, allMentions: [], directive: null, fullText: text };
// Find all @mentions (word boundary, not inside URLs/emails)
const mentionPattern = /(?:^|[\s(])@([a-zA-Z0-9_-]+)/g;
const allMentions: string[] = [];
let match;
while ((match = mentionPattern.exec(text)) !== null) {
allMentions.push(match[1]);
}
const mentioned = allMentions.some(
(m) => m.toLowerCase() === botUser.toLowerCase(),
);
let directive: string | null = null;
if (mentioned) {
// Extract text after @botname
const botPattern = new RegExp(
`@${escapeRegex(botUser)}\\s*([\\s\\S]*?)(?=@[a-zA-Z0-9_-]+|$)`,
"i",
);
const directiveMatch = text.match(botPattern);
if (directiveMatch) {
directive = directiveMatch[1].trim() || null;
}
}
return { mentioned, allMentions, directive, fullText: text };
} }
/** Format a Gitea event as a prompt for the LLM */ function escapeRegex(s: string): string {
function formatEventPrompt(event: any): string { return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const action = event.action; }
const repo = event.repository?.full_name || "unknown";
let prompt = `New Gitea event on ${repo}:\n\n`; // ── Prompt Formatting ────────────────────────────────────────────────────────
interface EventContext {
repo: string;
type: "issue" | "pull_request" | "issue_comment" | "push";
action: string;
number?: number;
title?: string;
author?: string;
body?: string;
labels?: string[];
/** PR-specific */
baseBranch?: string;
headBranch?: string;
/** Comment-specific: the issue/PR number being commented on */
parentNumber?: number;
/** Parsed mention info */
mention?: MentionInfo;
/** Source: "webhook" or "notification" */
source: "webhook" | "notification";
}
function formatPrompt(ctx: EventContext): string {
const { repo, type, action, mention, source } = ctx;
const via = source === "notification" ? " (via @mention)" : "";
let prompt = `New Gitea event on ${repo}${via}:\n\n`;
prompt += `**Action**: ${action}\n\n`; prompt += `**Action**: ${action}\n\n`;
if (event.issue) { if (type === "issue" || type === "pull_request") {
const issue = event.issue; const kind = type === "issue" ? "Issue" : "PR";
prompt += `**Issue #${issue.number}: ${issue.title}**\n`; prompt += `**${kind} #${ctx.number}: ${ctx.title}**\n`;
prompt += `**Author**: @${issue.user?.login || "unknown"}\n`; prompt += `**Author**: @${ctx.author || "unknown"}\n`;
prompt += `**Labels**: ${issue.labels?.map((l: any) => l.name).join(", ") || "none"}\n`; if (ctx.labels?.length) prompt += `**Labels**: ${ctx.labels.join(", ")}\n`;
prompt += `**Body**:\n${issue.body || "(no body)"}\n\n`; if (ctx.baseBranch) prompt += `**Base**: ${ctx.baseBranch} ← **Head**: ${ctx.headBranch}\n`;
prompt += `**Body**:\n${ctx.body || "(no body)"}\n\n`;
} }
if (event.pull_request) { if (type === "issue_comment") {
const pr = event.pull_request; prompt += `**Comment on #${ctx.parentNumber}**\n`;
prompt += `**PR #${pr.number}: ${pr.title}**\n`; prompt += `**Author**: @${ctx.author || "unknown"}\n`;
prompt += `**Author**: @${pr.user?.login || "unknown"}\n`; prompt += `**Body**:\n${ctx.body || "(no body)"}\n\n`;
prompt += `**Base**: ${pr.base?.label} ← **Head**: ${pr.head?.label}\n`;
prompt += `**Body**:\n${pr.body || "(no body)"}\n\n`;
} }
if (event.comment) { if (type === "push") {
const comment = event.comment; prompt += `**Pusher**: @${ctx.author}\n\n`;
const targetNumber = event.issue?.number || event.pull_request?.number;
prompt += `**Comment on #${targetNumber}**\n`;
prompt += `**Author**: @${comment.user?.login || "unknown"}\n`;
prompt += `**Body**:\n${comment.body || "(no body)"}\n\n`;
} }
if (event.pusher) { // Add mention context
prompt += `**Pusher**: @${event.pusher.name}\n`; if (mention?.mentioned && mention.directive) {
prompt += `**Commits**: ${event.commits?.length || 0}\n\n`; prompt += `---\n\n`;
prompt += `**You were @mentioned.** The request directed at you:\n`;
prompt += `> ${mention.directive}\n\n`;
} else if (mention?.mentioned) {
prompt += `---\n\n`;
prompt += `**You were @mentioned** in this ${type === "issue_comment" ? "comment" : type}.\n\n`;
}
if (mention?.allMentions.length && mention.allMentions.length > 1) {
const others = mention.allMentions.filter(
(m) => m.toLowerCase() !== BOT_USER.toLowerCase(),
);
if (others.length > 0) {
prompt += `Other users mentioned: ${others.map((m) => "@" + m).join(", ")}\n\n`;
}
} }
prompt += `---\n\n`; prompt += `---\n\n`;
@@ -118,86 +213,94 @@ function formatEventPrompt(event: any): string {
prompt += `1. Add helpful comments to issues/PRs\n`; prompt += `1. Add helpful comments to issues/PRs\n`;
prompt += `2. Suggest code fixes or improvements\n`; prompt += `2. Suggest code fixes or improvements\n`;
prompt += `3. Create branches and PRs to fix issues\n`; prompt += `3. Create branches and PRs to fix issues\n`;
prompt += `4. Update files directly (if direct_push is enabled)\n`; prompt += `4. Update files directly\n`;
prompt += `5. Ask for clarification if needed\n\n`; prompt += `5. Close issues when work is done\n`;
prompt += `6. Ask for clarification if needed\n\n`;
prompt += `Use the available Gitea tools to interact with the repository.`; prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt; return prompt;
} }
/** Format a polled issue/PR/comment as a synthetic event prompt */ // ── Event Queue ──────────────────────────────────────────────────────────────
function formatPolledEvent(type: "issue" | "pull_request" | "issue_comment", item: any, repo: string): string {
let prompt = `New Gitea event on ${repo} (via polling):\n\n`;
prompt += `**Action**: opened\n\n`;
if (type === "issue") { export function setSendMessage(fn: (message: string) => Promise<void>) {
prompt += `**Issue #${item.number}: ${item.title}**\n`; sendMessage = fn;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Labels**: ${item.labels?.map((l: any) => l.name).join(", ") || "none"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "pull_request") {
prompt += `**PR #${item.number}: ${item.title}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Base**: ${item.base?.label} ← **Head**: ${item.head?.label}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "issue_comment") {
prompt += `**Comment on #${item._issueNumber}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
}
prompt += `---\n\n`;
prompt += `Please analyze this event and decide how to respond.\n`;
prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt;
} }
/** Enqueue an event for LLM processing */ function enqueuePrompt(prompt: string) {
function enqueueEvent(event: any) { processingQueue.push({ prompt, timestamp: Date.now() });
processingQueue.push({ event, timestamp: Date.now() });
if (processingQueue.length > maxQueueDepth) { if (processingQueue.length > maxQueueDepth) {
processingQueue.shift(); processingQueue.shift();
console.warn(`[gitea-webhook] Queue full, dropped oldest event`); console.warn(`[gitea-events] Queue full, dropped oldest event`);
} }
void processQueue(); void processQueue();
} }
/** Process the event queue */
async function processQueue() { async function processQueue() {
if (isProcessing || processingQueue.length === 0) return; if (isProcessing || processingQueue.length === 0) return;
isProcessing = true; isProcessing = true;
while (processingQueue.length > 0) { while (processingQueue.length > 0) {
const { event } = processingQueue.shift()!; const { prompt } = processingQueue.shift()!;
const repoName = event.repository?.full_name || event._repo || "unknown";
console.log(`[gitea-webhook] Processing event: ${event.action || event._type} on ${repoName}`);
if (sendMessage) { if (sendMessage) {
try { try {
const prompt = event._polled
? formatPolledEvent(event._type, event._item, event._repo)
: formatEventPrompt(event);
await sendMessage(prompt); await sendMessage(prompt);
console.log(`[gitea-webhook] Event sent to LLM: ${event.action || event._type} on ${repoName}`); console.log(`[gitea-events] Event delivered to LLM`);
} catch (err) { } catch (err) {
console.error(`[gitea-webhook] Failed to send event to LLM:`, err); console.error(`[gitea-events] Failed to send to LLM:`, err instanceof Error ? err.message : err);
} }
} else { } else {
console.warn(`[gitea-webhook] No sendMessage function available, skipping event`); console.warn(`[gitea-events] No sendMessage function, skipping`);
} }
} }
isProcessing = false; isProcessing = false;
} }
// ── HTTP Server ────────────────────────────────────────────────────────────── // ── Mention Filter ───────────────────────────────────────────────────────────
/**
* Should we process this event?
* - Own repos (webhook): always, unless config says "mention"
* - Collab repos: only if @mentioned
* - Always skip events from the bot itself
*/
function shouldProcess(repo: string, author: string, body: string, source: "webhook" | "notification"): { process: boolean; mention: MentionInfo } {
const mention = parseMentions(body, BOT_USER);
// Never process our own events
if (author.toLowerCase() === BOT_USER.toLowerCase()) {
return { process: false, mention };
}
// Notification source = already @mention filtered by Gitea
if (source === "notification") {
return { process: true, mention };
}
// Webhook source — check repo config
const config = repoConfigs.get(repo) ?? { respondTo: "all" };
if (config.respondTo === "mention") {
return { process: mention.mentioned, mention };
}
// Default for webhook repos: respond to all
return { process: true, mention };
}
// ── HTTP Webhook Server ──────────────────────────────────────────────────────
function validateToken(req: IncomingMessage): boolean {
if (!WEBHOOK_TOKEN) return true;
const auth = req.headers["authorization"];
if (!auth) return false;
return auth === `Bearer ${WEBHOOK_TOKEN}`;
}
/** Start webhook server */
export function startWebhookServer(_pi: ExtensionAPI) { export function startWebhookServer(_pi: ExtensionAPI) {
return new Promise<void>(async (resolve, reject) => { return new Promise<void>(async (resolve, reject) => {
try { try {
@@ -219,16 +322,15 @@ export function startWebhookServer(_pi: ExtensionAPI) {
// GET /health // GET /health
if (url === "/health" && req.method === "GET") { if (url === "/health" && req.method === "GET") {
res.writeHead(200, { "Content-Type": "application/json" }); res.writeHead(200, { "Content-Type": "application/json" });
res.end( res.end(JSON.stringify({
JSON.stringify({ status: "ok",
status: "ok", uptime: process.uptime(),
uptime: process.uptime(), bot_user: BOT_USER,
webhook_repos: webhookRepos.size, webhook_repos: webhookRepos.size,
poll_only_repos: pollOnlyRepos.size, collab_repos: collabRepos.size,
queue_depth: processingQueue.length, queue_depth: processingQueue.length,
is_processing: isProcessing, is_processing: isProcessing,
}), }));
);
return; return;
} }
@@ -237,28 +339,25 @@ export function startWebhookServer(_pi: ExtensionAPI) {
if (!validateToken(req)) { if (!validateToken(req)) {
res.writeHead(401, { "Content-Type": "application/json" }); res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" })); res.end(JSON.stringify({ error: "Unauthorized" }));
console.error("[gitea-webhook] Token validation failed");
return; return;
} }
let body = ""; let rawBody = "";
for await (const chunk of req) { for await (const chunk of req) rawBody += chunk.toString();
body += chunk.toString();
}
let event; let event: any;
try { try {
event = JSON.parse(body); event = JSON.parse(rawBody);
} catch { } catch {
res.writeHead(400, { "Content-Type": "application/json" }); res.writeHead(400, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Invalid JSON" })); res.end(JSON.stringify({ error: "Invalid JSON" }));
return; return;
} }
enqueueEvent(event); handleWebhookEvent(event);
res.writeHead(200, { "Content-Type": "application/json" }); res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ received: true, event: event.action })); res.end(JSON.stringify({ received: true }));
return; return;
} }
@@ -278,7 +377,72 @@ export function startWebhookServer(_pi: ExtensionAPI) {
}); });
} }
/** Stop webhook server */ /** Process an inbound webhook event (from owned repos) */
function handleWebhookEvent(event: any) {
const repo = event.repository?.full_name || "unknown";
// Determine event type and extract body/author
let body = "";
let author = "";
let eventCtx: EventContext | null = null;
if (event.issue && !event.comment) {
// Issue opened/edited
body = event.issue.body || "";
author = event.issue.user?.login || "";
eventCtx = {
repo, type: "issue", action: event.action,
number: event.issue.number, title: event.issue.title,
author, body, labels: event.issue.labels?.map((l: any) => l.name),
source: "webhook",
};
} else if (event.comment) {
// Issue/PR comment
body = event.comment.body || "";
author = event.comment.user?.login || "";
const parentNumber = event.issue?.number || event.pull_request?.number;
eventCtx = {
repo, type: "issue_comment", action: event.action,
author, body, parentNumber,
source: "webhook",
};
} else if (event.pull_request && !event.comment) {
// PR opened/edited
body = event.pull_request.body || "";
author = event.pull_request.user?.login || "";
eventCtx = {
repo, type: "pull_request", action: event.action,
number: event.pull_request.number, title: event.pull_request.title,
author, body,
baseBranch: event.pull_request.base?.label,
headBranch: event.pull_request.head?.label,
source: "webhook",
};
} else if (event.pusher) {
// Push event — no @mention filtering
author = event.pusher?.name || "";
if (author.toLowerCase() === BOT_USER.toLowerCase()) return;
eventCtx = {
repo, type: "push", action: "push",
author, body: "",
source: "webhook",
};
}
if (!eventCtx) return;
const { process: shouldDo, mention } = shouldProcess(repo, author, body, "webhook");
if (!shouldDo) {
console.log(`[gitea-webhook] Skipped event on ${repo} (not mentioned, respondTo=mention)`);
return;
}
eventCtx.mention = mention;
const prompt = formatPrompt(eventCtx);
console.log(`[gitea-webhook] Event: ${eventCtx.type}/${eventCtx.action} on ${repo} by @${author}${mention.mentioned ? " (mentioned)" : ""}`);
enqueuePrompt(prompt);
}
export async function stopWebhookServer() { export async function stopWebhookServer() {
return new Promise<void>((resolve) => { return new Promise<void>((resolve) => {
if (server) { if (server) {
@@ -293,24 +457,25 @@ export async function stopWebhookServer() {
}); });
} }
// ── Repo Discovery & Webhook Registration ──────────────────────────────────── // ── Repo Discovery ───────────────────────────────────────────────────────────
/** Discover repos and install webhooks (or mark as poll-only on 403) */
async function discoverRepos() { async function discoverRepos() {
try { try {
const client = new GiteaClient(); const client = new GiteaClient();
const repos = await client.get<any[]>("/user/repos?limit=100"); const repos = await client.get<any[]>("/user/repos?limit=100");
const savedState = await loadPollState(); const authenticatedUser = BOT_USER.toLowerCase();
let newWebhooks = 0; let newWebhooks = 0;
let newPollOnly = 0; let newCollab = 0;
for (const repo of repos) { for (const repo of repos) {
const name = repo.full_name; const name = repo.full_name;
if (knownRepos.has(name)) continue; if (knownRepos.has(name)) continue;
// Try to install webhook const isOwner = repo.owner?.login?.toLowerCase() === authenticatedUser;
if (WEBHOOK_URL) {
if (WEBHOOK_URL && isOwner) {
// Own repo — try to install webhook
try { try {
const webhook = await client.post<any>(`/repos/${name}/hooks`, { const webhook = await client.post<any>(`/repos/${name}/hooks`, {
type: "gitea", type: "gitea",
@@ -322,159 +487,181 @@ async function discoverRepos() {
events: ["issues", "issue_comment", "pull_request", "push"], events: ["issues", "issue_comment", "pull_request", "push"],
active: true, active: true,
}); });
webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() }); webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() });
knownRepos.add(name); knownRepos.add(name);
// Own repos default to "all"
if (!repoConfigs.has(name)) repoConfigs.set(name, { respondTo: "all" });
newWebhooks++; newWebhooks++;
console.log(`[gitea-polling] ✅ Webhook installed: ${name} (ID: ${webhook.id})`); console.log(`[gitea-repos] ✅ Webhook: ${name} (ID: ${webhook.id})`);
continue; continue;
} catch (err) { } catch (err) {
if (err instanceof GiteaError && err.status === 403) { if (!(err instanceof GiteaError && err.status === 403)) {
// No admin access — fall back to polling console.error(`[gitea-repos] ❌ Webhook error: ${name}: ${err instanceof Error ? err.message : err}`);
// Use persisted timestamp if available, otherwise 5 min ago
const lastPoll = savedState[name] ?? new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: lastPoll,
});
knownRepos.add(name);
newPollOnly++;
console.log(`[gitea-polling] 📋 Poll-only (no admin): ${name}`);
continue;
} }
// Other error — log but don't add to known (retry next cycle) // Fall through to collab handling
console.error(`[gitea-polling] ❌ Webhook error for ${name}: ${err instanceof Error ? err.message : err}`);
} }
} else {
// No webhook URL configured — all repos are poll-only
const lastPoll = savedState[name] ?? new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: lastPoll,
});
knownRepos.add(name);
newPollOnly++;
} }
// Collab repo (or own repo without webhook URL) — notification-polled
collabRepos.add(name);
knownRepos.add(name);
// Collab repos default to "mention" only
if (!repoConfigs.has(name)) repoConfigs.set(name, { respondTo: "mention" });
newCollab++;
console.log(`[gitea-repos] 📋 Collab (mention-only): ${name}`);
} }
if (newWebhooks > 0 || newPollOnly > 0) { if (newWebhooks > 0 || newCollab > 0) {
console.log(`[gitea-polling] Discovered: ${newWebhooks} webhook, ${newPollOnly} poll-only`); console.log(`[gitea-repos] Discovered: ${newWebhooks} webhook, ${newCollab} collab`);
await saveState();
} }
console.log(`[gitea-polling] Total: ${webhookRepos.size} webhook + ${pollOnlyRepos.size} poll-only repos`); console.log(`[gitea-repos] Total: ${webhookRepos.size} webhook + ${collabRepos.size} collab repos`);
} catch (err) { } catch (err) {
console.error("[gitea-polling] Error discovering repos:", err instanceof Error ? err.message : err); console.error("[gitea-repos] Error discovering:", err instanceof Error ? err.message : err);
} }
} }
// ── Event Polling for Non-Admin Repos ──────────────────────────────────────── // ── Notification Polling ─────────────────────────────────────────────────────
/** Poll all poll-only repos for new issues, PRs, and comments */ /**
async function pollForEvents() { * Poll Gitea notifications API for @mentions and assignments.
if (pollOnlyRepos.size === 0) { * Much more efficient than scanning every repo — single API call.
console.log(`[gitea-poll-events] No poll-only repos to check`); * Gitea handles the @mention detection for us.
return; */
} async function pollNotifications() {
try {
const client = new GiteaClient();
console.log(`[gitea-poll-events] Checking ${pollOnlyRepos.size} repos for new events...`); // Fetch unread notifications
const client = new GiteaClient(); const notifications = await client.get<any[]>(
"/notifications?status-types=unread&limit=20",
);
for (const [repoName, state] of pollOnlyRepos) { if (notifications.length === 0) return;
try {
const since = state.lastPollAt;
const sinceDate = new Date(since);
const now = new Date().toISOString();
const [owner, repo] = repoName.split("/");
// Fetch recent issues created/updated since last poll console.log(`[gitea-notif] ${notifications.length} unread notifications`);
const issues = await client.get<any[]>(
`/repos/${owner}/${repo}/issues?state=open&sort=created&type=issues&since=${since}&limit=20`
);
for (const issue of issues) { for (const notif of notifications) {
// Skip issues created by the bot itself const notifId = notif.id;
if (issue.user?.login === BOT_USER) continue; const repo = notif.repository?.full_name;
const subjectType = notif.subject?.type; // "Issue", "Pull", "Commit"
const subjectUrl = notif.subject?.url; // API URL for the issue/PR
const latestCommentUrl = notif.subject?.latest_comment_url;
// Only process issues created after our last poll (not just updated) if (!repo || !subjectUrl) {
if (new Date(issue.created_at) > sinceDate) { await markNotifRead(client, notifId);
console.log(`[gitea-poll-events] New issue: ${repoName}#${issue.number} "${issue.title}"`); continue;
enqueueEvent({ }
_polled: true,
_type: "issue",
_item: issue,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
// Check for new comments on this issue try {
try { // Determine what triggered the notification:
const comments = await client.get<any[]>( // - If latest_comment_url exists and differs from subject_url, it's a comment
`/repos/${owner}/${repo}/issues/${issue.number}/comments?since=${since}` // - Otherwise it's the issue/PR itself
); let eventCtx: EventContext | null = null;
for (const comment of comments) {
if (comment.user?.login === BOT_USER) continue; if (latestCommentUrl && latestCommentUrl !== subjectUrl) {
if (new Date(comment.created_at) > sinceDate) { // Notification triggered by a comment
console.log(`[gitea-poll-events] New comment on ${repoName}#${issue.number} by @${comment.user?.login}`); const commentPath = latestCommentUrl.replace(/.*\/api\/v1/, "");
enqueueEvent({ console.log(`[gitea-notif] Fetching comment: ${commentPath}`);
_polled: true, const comment = await client.get<any>(commentPath);
_type: "issue_comment", const body = comment.body || "";
_item: { ...comment, _issueNumber: issue.number }, const author = comment.user?.login || "";
_repo: repoName,
action: "created", // Skip our own comments
repository: { full_name: repoName }, if (author.toLowerCase() === BOT_USER.toLowerCase()) {
}); await markNotifRead(client, notifId);
} continue;
}
const mention = parseMentions(body, BOT_USER);
// Extract issue/PR number from subject URL
const numberMatch = subjectUrl.match(/\/(\d+)$/);
const parentNumber = numberMatch ? parseInt(numberMatch[1], 10) : undefined;
eventCtx = {
repo, type: "issue_comment", action: "created",
author, body, parentNumber, mention,
source: "notification",
};
} else {
// Notification triggered by the issue/PR itself
const subjectPath = subjectUrl.replace(/.*\/api\/v1/, "");
console.log(`[gitea-notif] Fetching subject: ${subjectPath} (type: ${subjectType})`);
const subject = await client.get<any>(subjectPath);
const body = subject.body || "";
const author = subject.user?.login || "";
if (author.toLowerCase() === BOT_USER.toLowerCase()) {
await markNotifRead(client, notifId);
continue;
}
const mention = parseMentions(body, BOT_USER);
if (subjectType === "Issue") {
eventCtx = {
repo, type: "issue", action: "opened",
number: subject.number, title: subject.title,
author, body, labels: subject.labels?.map((l: any) => l.name),
mention, source: "notification",
};
} else if (subjectType === "Pull") {
eventCtx = {
repo, type: "pull_request", action: "opened",
number: subject.number, title: subject.title,
author, body,
baseBranch: subject.base?.label,
headBranch: subject.head?.label,
mention, source: "notification",
};
} }
} catch {
// Comment fetch failed, skip
} }
if (eventCtx) {
const mentionTag = eventCtx.mention?.mentioned ? " 🔔" : "";
console.log(`[gitea-notif] ${eventCtx.type} on ${repo}#${eventCtx.number ?? eventCtx.parentNumber} by @${eventCtx.author}${mentionTag}`);
const prompt = formatPrompt(eventCtx);
enqueuePrompt(prompt);
}
} catch (err) {
console.error(`[gitea-notif] Error processing notification ${notifId}:`, err instanceof Error ? err.message : err);
} }
// Fetch recent PRs // Mark as read
const prs = await client.get<any[]>( await markNotifRead(client, notifId);
`/repos/${owner}/${repo}/pulls?state=open&sort=created&limit=20`
);
for (const pr of prs) {
if (pr.user?.login === BOT_USER) continue;
if (new Date(pr.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New PR: ${repoName}#${pr.number} "${pr.title}"`);
enqueueEvent({
_polled: true,
_type: "pull_request",
_item: pr,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
}
// Update last poll timestamp
state.lastPollAt = now;
} catch (err) {
console.error(`[gitea-poll-events] Error polling ${repoName}: ${err instanceof Error ? err.message : err}`);
} }
} catch (err) {
console.error("[gitea-notif] Error polling:", err instanceof Error ? err.message : err);
} }
}
// Persist poll timestamps async function markNotifRead(client: GiteaClient, notifId: number) {
await savePollState(); try {
await client.patch(`/notifications/threads/${notifId}`, {});
} catch {
// Best effort — don't fail the whole poll
}
} }
// ── Public API ─────────────────────────────────────────────────────────────── // ── Public API ───────────────────────────────────────────────────────────────
/** Start repo discovery polling + event polling */ export async function startPolling(_pi: ExtensionAPI) {
export function startPolling(_pi: ExtensionAPI) { // Load persisted state
const saved = await loadState();
for (const [repo, config] of Object.entries(saved.repoConfigs)) {
repoConfigs.set(repo, config);
}
// Discover repos immediately, then on interval // Discover repos immediately, then on interval
void discoverRepos(); void discoverRepos();
repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000); repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`); console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`);
// Event polling for poll-only repos on a faster interval // Notification polling on a fast interval
eventPollTimer = setInterval(() => void pollForEvents(), EVENT_POLL_INTERVAL * 1000); notifPollTimer = setInterval(() => void pollNotifications(), NOTIF_POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Event polling started (interval: ${EVENT_POLL_INTERVAL}s)`); console.log(`[gitea-polling] Notification polling started (interval: ${NOTIF_POLL_INTERVAL}s)`);
} }
export function stopPolling() { export function stopPolling() {
@@ -482,19 +669,30 @@ export function stopPolling() {
clearInterval(repoPollTimer); clearInterval(repoPollTimer);
repoPollTimer = null; repoPollTimer = null;
} }
if (eventPollTimer) { if (notifPollTimer) {
clearInterval(eventPollTimer); clearInterval(notifPollTimer);
eventPollTimer = null; notifPollTimer = null;
} }
// Clear state for clean reload
webhookRepos.clear(); webhookRepos.clear();
pollOnlyRepos.clear(); collabRepos.clear();
knownRepos.clear(); knownRepos.clear();
} }
export function getTrackedRepos() { export function getTrackedRepos() {
return { return {
webhook: new Map(webhookRepos), webhook: new Map(webhookRepos),
pollOnly: new Map(pollOnlyRepos), collab: new Set(collabRepos),
configs: new Map(repoConfigs),
}; };
} }
/** Set respondTo mode for a repo. Returns the new config. */
export function setRepoConfig(repo: string, config: Partial<RepoConfig>): RepoConfig {
const current = repoConfigs.get(repo) ?? { respondTo: "mention" };
const updated = { ...current, ...config };
repoConfigs.set(repo, updated);
void saveState();
return updated;
}
export { parseMentions, type MentionInfo, type RepoConfig };