feat: hybrid webhook + polling for collab repos

- Repos with admin access get webhooks auto-installed
- Collab repos (403 on webhook install) fall back to event polling
- Poll every 60s for new issues, PRs, and comments
- Use Date objects for timestamp comparison (fixes TZ offset issues)
- Skip events from the bot's own user
- Clean startup/shutdown with state reset on reload
- Health endpoint reports webhook_repos + poll_only_repos counts
This commit is contained in:
2026-03-13 16:47:07 -07:00
parent 25e49db155
commit 578e2f91cb

View File

@@ -1,28 +1,42 @@
/**
* Webhook server — receives Gitea events via HTTP
* Webhook server — receives Gitea events via HTTP + polls collab repos
*
* Auth: Bearer token validation (PI_WEBHOOK_TOKEN).
* No HMAC/secret — consistent with token-based auth strategy.
*
* Repos where the bot has admin access get webhooks installed automatically.
* Repos where the bot is a non-admin collaborator are polled for new events.
*/
import type { Server, IncomingMessage, ServerResponse } from "node:http";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { GiteaClient } from "../../src/client.js";
import { GiteaClient, GiteaError } from "../../src/client.js";
const WEBHOOK_HOST = process.env.PI_WEBHOOK_HOST ?? "0.0.0.0";
const WEBHOOK_PORT = parseInt(process.env.PI_WEBHOOK_PORT ?? "3000", 10);
const WEBHOOK_TOKEN = process.env.PI_WEBHOOK_TOKEN ?? "";
const WEBHOOK_URL = process.env.PI_WEBHOOK_URL ?? "";
const POLL_INTERVAL = parseInt(process.env.PI_BOT_POLL_INTERVAL ?? "300", 10);
const EVENT_POLL_INTERVAL = parseInt(process.env.PI_EVENT_POLL_INTERVAL ?? "60", 10);
const BOT_USER = process.env.PI_GIT_USER ?? "";
let server: Server | null = null;
let trackedRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
let processingQueue: Array<{ event: any; timestamp: number }> = [];
let maxQueueDepth = 50;
let isProcessing = false;
let pollTimer: NodeJS.Timeout | null = null;
let repoPollTimer: NodeJS.Timeout | null = null;
let eventPollTimer: NodeJS.Timeout | null = null;
let sendMessage: ((message: string) => Promise<void>) | null = null;
/** Repos where we successfully installed a webhook */
let webhookRepos: Map<string, { webhookId: number; addedAt: number }> = new Map();
/** Repos where webhook install failed (403) — we poll these for events */
let pollOnlyRepos: Map<string, { addedAt: number; lastPollAt: string }> = new Map();
/** Track all known repos so we don't re-attempt webhook install every cycle */
let knownRepos: Set<string> = new Set();
export function setSendMessage(fn: (message: string) => Promise<void>) {
sendMessage = fn;
}
@@ -84,6 +98,46 @@ function formatEventPrompt(event: any): string {
return prompt;
}
/** Format a polled issue/PR/comment as a synthetic event prompt */
function formatPolledEvent(type: "issue" | "pull_request" | "issue_comment", item: any, repo: string): string {
let prompt = `New Gitea event on ${repo} (via polling):\n\n`;
prompt += `**Action**: opened\n\n`;
if (type === "issue") {
prompt += `**Issue #${item.number}: ${item.title}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Labels**: ${item.labels?.map((l: any) => l.name).join(", ") || "none"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "pull_request") {
prompt += `**PR #${item.number}: ${item.title}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Base**: ${item.base?.label} ← **Head**: ${item.head?.label}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
} else if (type === "issue_comment") {
prompt += `**Comment on #${item._issueNumber}**\n`;
prompt += `**Author**: @${item.user?.login || "unknown"}\n`;
prompt += `**Body**:\n${item.body || "(no body)"}\n\n`;
}
prompt += `---\n\n`;
prompt += `Please analyze this event and decide how to respond.\n`;
prompt += `Use the available Gitea tools to interact with the repository.`;
return prompt;
}
/** Enqueue an event for LLM processing */
function enqueueEvent(event: any) {
processingQueue.push({ event, timestamp: Date.now() });
if (processingQueue.length > maxQueueDepth) {
processingQueue.shift();
console.warn(`[gitea-webhook] Queue full, dropped oldest event`);
}
void processQueue();
}
/** Process the event queue */
async function processQueue() {
if (isProcessing || processingQueue.length === 0) return;
@@ -92,15 +146,17 @@ async function processQueue() {
while (processingQueue.length > 0) {
const { event } = processingQueue.shift()!;
const repoName = event.repository?.full_name || "unknown";
const repoName = event.repository?.full_name || event._repo || "unknown";
console.log(`[gitea-webhook] Processing event: ${event.action} on ${repoName}`);
console.log(`[gitea-webhook] Processing event: ${event.action || event._type} on ${repoName}`);
if (sendMessage) {
try {
const prompt = formatEventPrompt(event);
const prompt = event._polled
? formatPolledEvent(event._type, event._item, event._repo)
: formatEventPrompt(event);
await sendMessage(prompt);
console.log(`[gitea-webhook] Event sent to LLM: ${event.action} on ${repoName}`);
console.log(`[gitea-webhook] Event sent to LLM: ${event.action || event._type} on ${repoName}`);
} catch (err) {
console.error(`[gitea-webhook] Failed to send event to LLM:`, err);
}
@@ -112,6 +168,8 @@ async function processQueue() {
isProcessing = false;
}
// ── HTTP Server ──────────────────────────────────────────────────────────────
/** Start webhook server */
export function startWebhookServer(_pi: ExtensionAPI) {
return new Promise<void>(async (resolve, reject) => {
@@ -138,7 +196,8 @@ export function startWebhookServer(_pi: ExtensionAPI) {
JSON.stringify({
status: "ok",
uptime: process.uptime(),
tracked_repos: trackedRepos.size,
webhook_repos: webhookRepos.size,
poll_only_repos: pollOnlyRepos.size,
queue_depth: processingQueue.length,
is_processing: isProcessing,
}),
@@ -148,7 +207,6 @@ export function startWebhookServer(_pi: ExtensionAPI) {
// POST /hooks/gitea
if (url === "/hooks/gitea" && req.method === "POST") {
// Validate token
if (!validateToken(req)) {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
@@ -156,13 +214,11 @@ export function startWebhookServer(_pi: ExtensionAPI) {
return;
}
// Read body
let body = "";
for await (const chunk of req) {
body += chunk.toString();
}
// Parse event
let event;
try {
event = JSON.parse(body);
@@ -172,17 +228,7 @@ export function startWebhookServer(_pi: ExtensionAPI) {
return;
}
// Queue event
processingQueue.push({ event, timestamp: Date.now() });
if (processingQueue.length > maxQueueDepth) {
const dropped = processingQueue.shift();
if (dropped) {
console.warn(`[gitea-webhook] Queue full, dropping oldest event`);
}
}
void processQueue();
enqueueEvent(event);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ received: true, event: event.action }));
@@ -220,43 +266,25 @@ export async function stopWebhookServer() {
});
}
/** Poll for new repos and register webhooks */
export function startPolling(_pi: ExtensionAPI) {
void fetchUserRepos();
pollTimer = setInterval(() => void fetchUserRepos(), POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Polling started (interval: ${POLL_INTERVAL}s)`);
}
// ── Repo Discovery & Webhook Registration ────────────────────────────────────
export function stopPolling() {
if (pollTimer) {
clearInterval(pollTimer);
pollTimer = null;
}
}
async function fetchUserRepos() {
/** Discover repos and install webhooks (or mark as poll-only on 403) */
async function discoverRepos() {
try {
const client = new GiteaClient();
if (!WEBHOOK_URL) {
console.warn("[gitea-polling] PI_WEBHOOK_URL not set, skipping webhook registration");
return;
}
const repos = await client.get<any[]>("/user/repos?limit=100");
const newRepos: any[] = [];
let newWebhooks = 0;
let newPollOnly = 0;
for (const repo of repos) {
if (!trackedRepos.has(repo.full_name)) {
newRepos.push(repo);
}
}
const name = repo.full_name;
if (knownRepos.has(name)) continue;
if (newRepos.length > 0) {
console.log(`[gitea-polling] Found ${newRepos.length} new repos, registering webhooks...`);
for (const repo of newRepos) {
// Try to install webhook
if (WEBHOOK_URL) {
try {
const webhook = await client.post<any>(`/repos/${repo.full_name}/hooks`, {
const webhook = await client.post<any>(`/repos/${name}/hooks`, {
type: "gitea",
config: {
url: `${WEBHOOK_URL}/hooks/gitea`,
@@ -267,20 +295,175 @@ async function fetchUserRepos() {
active: true,
});
trackedRepos.set(repo.full_name, { webhookId: webhook.id, addedAt: Date.now() });
console.log(`[gitea-polling] Webhook created for ${repo.full_name} (ID: ${webhook.id})`);
webhookRepos.set(name, { webhookId: webhook.id, addedAt: Date.now() });
knownRepos.add(name);
newWebhooks++;
console.log(`[gitea-polling] ✅ Webhook installed: ${name} (ID: ${webhook.id})`);
continue;
} catch (err) {
console.error(`[gitea-polling] Error creating webhook for ${repo.full_name}:`, err);
if (err instanceof GiteaError && err.status === 403) {
// No admin access — fall back to polling
// Set lastPollAt to 5 minutes ago so we catch recent events
const fiveMinAgo = new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: fiveMinAgo,
});
knownRepos.add(name);
newPollOnly++;
console.log(`[gitea-polling] 📋 Poll-only (no admin): ${name}`);
continue;
}
// Other error — log but don't add to known (retry next cycle)
console.error(`[gitea-polling] ❌ Webhook error for ${name}: ${err instanceof Error ? err.message : err}`);
}
} else {
// No webhook URL configured — all repos are poll-only
const fiveMinAgo = new Date(Date.now() - 5 * 60 * 1000).toISOString();
pollOnlyRepos.set(name, {
addedAt: Date.now(),
lastPollAt: fiveMinAgo,
});
knownRepos.add(name);
newPollOnly++;
}
}
console.log(`[gitea-polling] Current repos: ${trackedRepos.size}`);
if (newWebhooks > 0 || newPollOnly > 0) {
console.log(`[gitea-polling] Discovered: ${newWebhooks} webhook, ${newPollOnly} poll-only`);
}
console.log(`[gitea-polling] Total: ${webhookRepos.size} webhook + ${pollOnlyRepos.size} poll-only repos`);
} catch (err) {
console.error("[gitea-polling] Error fetching user repos:", err);
console.error("[gitea-polling] Error discovering repos:", err instanceof Error ? err.message : err);
}
}
export function getTrackedRepos() {
return new Map(trackedRepos);
// ── Event Polling for Non-Admin Repos ────────────────────────────────────────
/** Poll all poll-only repos for new issues, PRs, and comments */
async function pollForEvents() {
if (pollOnlyRepos.size === 0) {
console.log(`[gitea-poll-events] No poll-only repos to check`);
return;
}
console.log(`[gitea-poll-events] Checking ${pollOnlyRepos.size} repos for new events...`);
const client = new GiteaClient();
for (const [repoName, state] of pollOnlyRepos) {
try {
const since = state.lastPollAt;
const sinceDate = new Date(since);
const now = new Date().toISOString();
const [owner, repo] = repoName.split("/");
// Fetch recent issues created/updated since last poll
const issues = await client.get<any[]>(
`/repos/${owner}/${repo}/issues?state=open&sort=created&type=issues&since=${since}&limit=20`
);
for (const issue of issues) {
// Skip issues created by the bot itself
if (issue.user?.login === BOT_USER) continue;
// Only process issues created after our last poll (not just updated)
if (new Date(issue.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New issue: ${repoName}#${issue.number} "${issue.title}"`);
enqueueEvent({
_polled: true,
_type: "issue",
_item: issue,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
// Check for new comments on this issue
try {
const comments = await client.get<any[]>(
`/repos/${owner}/${repo}/issues/${issue.number}/comments?since=${since}`
);
for (const comment of comments) {
if (comment.user?.login === BOT_USER) continue;
if (new Date(comment.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New comment on ${repoName}#${issue.number} by @${comment.user?.login}`);
enqueueEvent({
_polled: true,
_type: "issue_comment",
_item: { ...comment, _issueNumber: issue.number },
_repo: repoName,
action: "created",
repository: { full_name: repoName },
});
}
}
} catch {
// Comment fetch failed, skip
}
}
// Fetch recent PRs
const prs = await client.get<any[]>(
`/repos/${owner}/${repo}/pulls?state=open&sort=created&limit=20`
);
for (const pr of prs) {
if (pr.user?.login === BOT_USER) continue;
if (new Date(pr.created_at) > sinceDate) {
console.log(`[gitea-poll-events] New PR: ${repoName}#${pr.number} "${pr.title}"`);
enqueueEvent({
_polled: true,
_type: "pull_request",
_item: pr,
_repo: repoName,
action: "opened",
repository: { full_name: repoName },
});
}
}
// Update last poll timestamp
state.lastPollAt = now;
} catch (err) {
console.error(`[gitea-poll-events] Error polling ${repoName}: ${err instanceof Error ? err.message : err}`);
}
}
}
// ── Public API ───────────────────────────────────────────────────────────────
/** Start repo discovery polling + event polling */
export function startPolling(_pi: ExtensionAPI) {
// Discover repos immediately, then on interval
void discoverRepos();
repoPollTimer = setInterval(() => void discoverRepos(), POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Repo discovery started (interval: ${POLL_INTERVAL}s)`);
// Event polling for poll-only repos on a faster interval
eventPollTimer = setInterval(() => void pollForEvents(), EVENT_POLL_INTERVAL * 1000);
console.log(`[gitea-polling] Event polling started (interval: ${EVENT_POLL_INTERVAL}s)`);
}
export function stopPolling() {
if (repoPollTimer) {
clearInterval(repoPollTimer);
repoPollTimer = null;
}
if (eventPollTimer) {
clearInterval(eventPollTimer);
eventPollTimer = null;
}
// Clear state for clean reload
webhookRepos.clear();
pollOnlyRepos.clear();
knownRepos.clear();
}
export function getTrackedRepos() {
return {
webhook: new Map(webhookRepos),
pollOnly: new Map(pollOnlyRepos),
};
}