feat(blocks): Add webhook block status indicator (#8838)

- Resolves #8743
- Follow-up to #8358

### Demo


https://github.com/user-attachments/assets/f983dfa2-2dc2-4ab0-8373-e768ba17e6f7

### Changes 🏗️

- feat(frontend): Add webhook status indicator on `CustomNode`
   - Add `webhookId` to frontend node data model

- fix(backend): Fix webhook ping endpoint
   - Remove `provider` path parameter
   - Fix return values and error handling
   - Fix `WebhooksManager.trigger_ping(..)`
      - Add `credentials` parameter
      - Fix usage of credentials
   - Fix `.data.integrations.wait_for_webhook_event(..)`
      - Add `AsyncRedisEventBus.wait_for_event(..)`

- feat(frontend): Add `BackendAPIProvider` + `useBackendAPI`

- feat(frontend): Improve layout of node header

    Before:

![image](https://github.com/user-attachments/assets/17a33b94-65f0-4e34-a47d-2dd321edecae)
    After:

![image](https://github.com/user-attachments/assets/64afb1e4-e3f2-4ca9-8961-f1245f25477f)

- refactor(backend): Clean up `.data.integrations`
- refactor(backend): Fix naming in `.data.queue` for understandability

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Add webhook block, save -> gray indicator
  - [x] Add necessary info to webhook block, save -> green indicator
  - [x] Remove necessary info, save -> gray indicator

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
This commit is contained in:
Reinier van der Leer 2024-12-05 11:35:13 +01:00 committed by GitHub
parent 6b742d1a8c
commit 64f5e60d12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 221 additions and 77 deletions

View File

@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]):
def event_bus_name(self) -> str:
return "webhooks"
async def publish(self, event: WebhookEvent):
await self.publish_event(event, f"{event.webhook_id}/{event.event_type}")
async def listen(
self, webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"):
yield event
event_bus = WebhookEventBus()
_webhook_event_bus = WebhookEventBus()
async def publish_webhook_event(event: WebhookEvent):
await event_bus.publish(event)
await _webhook_event_bus.publish_event(
event, f"{event.webhook_id}/{event.event_type}"
)
async def listen_for_webhook_event(
async def listen_for_webhook_events(
webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in _webhook_event_bus.listen_events(
f"{webhook_id}/{event_type or '*'}"
):
yield event
async def wait_for_webhook_event(
webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None
) -> WebhookEvent | None:
async for event in event_bus.listen(webhook_id, event_type):
return event # Only one event is expected
return await _webhook_event_bus.wait_for_event(
f"{webhook_id}/{event_type or '*'}", timeout
)

View File

@ -1,8 +1,9 @@
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, AsyncGenerator, Generator, Generic, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from pydantic import BaseModel
from redis.asyncio.client import PubSub as AsyncPubSub
@ -48,12 +49,12 @@ class BaseRedisEventBus(Generic[M], ABC):
except Exception as e:
logger.error(f"Failed to parse event result from Redis {msg} {e}")
def _subscribe(
def _get_pubsub_channel(
self, connection: redis.Redis | redis.AsyncRedis, channel_key: str
) -> tuple[PubSub | AsyncPubSub, str]:
channel_name = f"{self.event_bus_name}/{channel_key}"
full_channel_name = f"{self.event_bus_name}/{channel_key}"
pubsub = connection.pubsub()
return pubsub, channel_name
return pubsub, full_channel_name
class RedisEventBus(BaseRedisEventBus[M], ABC):
@ -64,17 +65,19 @@ class RedisEventBus(BaseRedisEventBus[M], ABC):
return redis.get_redis()
def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
self.connection.publish(channel_name, message)
message, full_channel_name = self._serialize_message(event, channel_key)
self.connection.publish(full_channel_name, message)
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
pubsub, channel_name = self._subscribe(self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
self.connection, channel_key
)
assert isinstance(pubsub, PubSub)
if "*" in channel_key:
pubsub.psubscribe(channel_name)
pubsub.psubscribe(full_channel_name)
else:
pubsub.subscribe(channel_name)
pubsub.subscribe(full_channel_name)
for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
@ -89,19 +92,31 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
return await redis.get_redis_async()
async def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
message, full_channel_name = self._serialize_message(event, channel_key)
connection = await self.connection
await connection.publish(channel_name, message)
await connection.publish(full_channel_name, message)
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
pubsub, channel_name = self._subscribe(await self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
await self.connection, channel_key
)
assert isinstance(pubsub, AsyncPubSub)
if "*" in channel_key:
await pubsub.psubscribe(channel_name)
await pubsub.psubscribe(full_channel_name)
else:
await pubsub.subscribe(channel_name)
await pubsub.subscribe(full_channel_name)
async for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
yield event
async def wait_for_event(
self, channel_key: str, timeout: Optional[float] = None
) -> M | None:
try:
return await asyncio.wait_for(
anext(aiter(self.listen_events(channel_key))), timeout
)
except TimeoutError:
return None

View File

@ -81,7 +81,9 @@ class BaseWebhooksManager(ABC, Generic[WT]):
# --8<-- [end:BaseWebhooksManager3]
# --8<-- [start:BaseWebhooksManager5]
async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
"""
Triggers a ping to the given webhook.

View File

@ -58,10 +58,15 @@ class GithubWebhooksManager(BaseWebhooksManager):
return payload, event_type
async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
if not credentials:
raise ValueError("Credentials are required but were not passed")
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": f"Bearer {webhook.config.get('access_token')}",
"Authorization": credentials.bearer(),
}
repo, github_hook_id = webhook.resource, webhook.provider_webhook_id

View File

@ -9,8 +9,8 @@ from backend.data.integrations import (
WebhookEvent,
get_all_webhooks,
get_webhook,
listen_for_webhook_event,
publish_webhook_event,
wait_for_webhook_event,
)
from backend.data.model import (
APIKeyCredentials,
@ -300,18 +300,28 @@ async def webhook_ingress_generic(
)
@router.post("/{provider}/webhooks/{webhook_id}/ping")
@router.post("/webhooks/{webhook_id}/ping")
async def webhook_ping(
provider: Annotated[str, Path(title="Provider where the webhook was registered")],
webhook_id: Annotated[str, Path(title="Our ID for the webhook")],
user_id: Annotated[str, Depends(get_user_id)], # require auth
):
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]()
webhook = await get_webhook(webhook_id)
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]()
await webhook_manager.trigger_ping(webhook)
if not await listen_for_webhook_event(webhook_id, event_type="ping"):
raise HTTPException(status_code=500, detail="Webhook ping event not received")
credentials = (
creds_manager.get(user_id, webhook.credentials_id)
if webhook.credentials_id
else None
)
try:
await webhook_manager.trigger_ping(webhook, credentials)
except NotImplementedError:
return False
if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10):
raise HTTPException(status_code=504, detail="Webhook ping timed out")
return True
# --------------------------- UTILITIES ---------------------------- #

View File

@ -90,3 +90,6 @@ ignore_patterns = []
[tool.pytest.ini_options]
asyncio_mode = "auto"
[tool.ruff]
target-version = "py310"

View File

@ -3,6 +3,7 @@
import * as React from "react";
import { ThemeProvider as NextThemesProvider } from "next-themes";
import { ThemeProviderProps } from "next-themes/dist/types";
import { BackendAPIProvider } from "@/lib/autogpt-server-api";
import { TooltipProvider } from "@/components/ui/tooltip";
import SupabaseProvider from "@/components/SupabaseProvider";
import CredentialsProvider from "@/components/integrations/credentials-provider";
@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) {
return (
<NextThemesProvider {...props}>
<SupabaseProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
<BackendAPIProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
</BackendAPIProvider>
</SupabaseProvider>
</NextThemesProvider>
);

View File

@ -4,6 +4,7 @@ import React, {
useCallback,
useRef,
useContext,
useMemo,
} from "react";
import { NodeProps, useReactFlow, Node, Edge } from "@xyflow/react";
import "@xyflow/react/dist/style.css";
@ -17,7 +18,8 @@ import {
NodeExecutionResult,
BlockUIType,
BlockCost,
} from "@/lib/autogpt-server-api/types";
useBackendAPI,
} from "@/lib/autogpt-server-api";
import {
beautifyString,
cn,
@ -68,6 +70,7 @@ export type CustomNodeData = {
outputSchema: BlockIORootSchema;
hardcodedValues: { [key: string]: any };
connections: ConnectionData;
webhookId?: string;
isOutputOpen: boolean;
status?: NodeExecutionResult["status"];
/** executionResults contains outputs across multiple executions
@ -104,6 +107,7 @@ export function CustomNode({
>();
const isInitialSetup = useRef(true);
const flowContext = useContext(FlowContext);
const api = useBackendAPI();
let nodeFlowId = "";
if (data.uiType === BlockUIType.AGENT) {
@ -513,6 +517,57 @@ export function CustomNode({
isCostFilterMatch(cost.cost_filter, inputValues),
);
const [webhookStatus, setWebhookStatus] = useState<
"works" | "exists" | "broken" | "none" | "pending" | null
>(null);
useEffect(() => {
if (data.uiType != BlockUIType.WEBHOOK) return;
if (!data.webhookId) {
setWebhookStatus("none");
return;
}
setWebhookStatus("pending");
api
.pingWebhook(data.webhookId)
.then((pinged) => setWebhookStatus(pinged ? "works" : "exists"))
.catch((error: Error) =>
error.message.includes("ping timed out")
? setWebhookStatus("broken")
: setWebhookStatus("none"),
);
}, [data.uiType, data.webhookId, api, setWebhookStatus]);
const webhookStatusDot = useMemo(
() =>
webhookStatus && (
<div
className={cn(
"size-4 rounded-full border-2",
{
pending: "animate-pulse border-gray-300 bg-gray-400",
works: "border-green-300 bg-green-400",
exists: "border-green-200 bg-green-300",
broken: "border-red-400 bg-red-500",
none: "border-gray-300 bg-gray-400",
}[webhookStatus],
)}
title={
{
pending: "Checking connection status...",
works: "Connected",
exists:
"Connected (but we could not verify the real-time status)",
broken: "The connected webhook is not working",
none: "Not connected. Fill out all the required block inputs and save the agent to connect.",
}[webhookStatus]
}
/>
),
[webhookStatus],
);
const LineSeparator = () => (
<div className="bg-white pt-6">
<Separator.Root className="h-[1px] w-full bg-gray-300"></Separator.Root>
@ -580,55 +635,61 @@ export function CustomNode({
>
{/* Header */}
<div
className={`flex h-24 border-b border-gray-300 ${data.uiType === BlockUIType.NOTE ? "bg-yellow-100" : "bg-white"} items-center rounded-t-xl`}
className={`flex h-24 border-b border-gray-300 ${data.uiType === BlockUIType.NOTE ? "bg-yellow-100" : "bg-white"} space-x-1 rounded-t-xl`}
>
{/* Color Stripe */}
<div className={`-ml-px h-full w-3 rounded-tl-xl ${stripeColor}`}></div>
<div className="flex w-full flex-col">
<div className="flex flex-row items-center justify-between">
<div className="font-roboto flex items-center px-3 text-lg font-semibold">
<div className="flex w-full flex-col justify-start space-y-2.5 px-4 pt-4">
<div className="flex flex-row items-center space-x-2 font-semibold">
<h3 className="font-roboto text-lg">
<TextRenderer
value={beautifyString(
data.blockType?.replace(/Block$/, "") || data.title,
)}
truncateLengthLimit={80}
/>
</h3>
<span className="text-xs text-gray-500">#{id.split("-")[0]}</span>
<div className="px-2 text-xs text-gray-500">
#{id.split("-")[0]}
</div>
</div>
<div className="w-auto grow" />
{webhookStatusDot}
<button
aria-label="Options"
className="cursor-pointer rounded-full border-none bg-transparent p-1 hover:bg-gray-100"
onClick={onContextButtonTrigger}
>
<DotsVerticalIcon className="h-5 w-5" />
</button>
</div>
<div className="flex items-center space-x-2">
{blockCost && (
<div className="mr-3 text-base font-light">
<span className="ml-auto flex items-center">
<IconCoin />{" "}
<span className="mx-1 font-medium">
{blockCost.cost_amount}
</span>{" "}
credits/{blockCost.cost_type}
</span>
</div>
)}
{data.categories.map((category) => (
<Badge
key={category.category}
variant="outline"
className={`${getPrimaryCategoryColor([category])} h-6 whitespace-nowrap rounded-full border border-gray-300 opacity-50`}
>
{beautifyString(category.category.toLowerCase())}
</Badge>
))}
</div>
{blockCost && (
<div className="px-3 text-base font-light">
<span className="ml-auto flex items-center">
<IconCoin />{" "}
<span className="m-1 font-medium">{blockCost.cost_amount}</span>{" "}
credits/{blockCost.cost_type}
</span>
</div>
)}
</div>
{data.categories.map((category) => (
<Badge
key={category.category}
variant="outline"
className={`mr-5 ${getPrimaryCategoryColor([category])} whitespace-nowrap rounded-xl border border-gray-300 opacity-50`}
>
{beautifyString(category.category.toLowerCase())}
</Badge>
))}
<button
aria-label="Options"
className="mr-2 cursor-pointer rounded-full border-none bg-transparent p-1 hover:bg-gray-100"
onClick={onContextButtonTrigger}
>
<DotsVerticalIcon className="h-5 w-5" />
</button>
<ContextMenuContent />
</div>
{/* Body */}
<div className="ml-5 mt-6 rounded-b-xl">
{/* Input Handles */}

View File

@ -170,6 +170,7 @@ export default function useAgentGraph(
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhookId: node.webhook_id,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
@ -776,6 +777,7 @@ export default function useAgentGraph(
),
status: undefined,
backend_id: backendNode.id,
webhookId: backendNode.webhook_id,
executionResults: [],
},
}

View File

@ -241,6 +241,15 @@ export default class BaseAutoGPTServerAPI {
);
}
/**
* @returns `true` if a ping event was received, `false` if provider doesn't support pinging but the webhook exists.
* @throws `Error` if the webhook does not exist.
* @throws `Error` if the attempt to ping timed out.
*/
async pingWebhook(webhook_id: string): Promise<boolean> {
return this._request("POST", `/integrations/webhooks/${webhook_id}/ping`);
}
logMetric(metric: AnalyticsMetrics) {
return this._request("POST", "/analytics/log_raw_metric", metric);
}

View File

@ -1,7 +1,7 @@
import { createClient } from "../supabase/client";
import BaseAutoGPTServerAPI from "./baseClient";
export default class AutoGPTServerAPI extends BaseAutoGPTServerAPI {
export class AutoGPTServerAPI extends BaseAutoGPTServerAPI {
constructor(
baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL ||
"http://localhost:8006/api",

View File

@ -0,0 +1,28 @@
import { AutoGPTServerAPI } from "./client";
import React, { createContext, useMemo } from "react";
const BackendAPIProviderContext = createContext<AutoGPTServerAPI | null>(null);
export function BackendAPIProvider({
children,
}: {
children?: React.ReactNode;
}): React.ReactNode {
const api = useMemo(() => new AutoGPTServerAPI(), []);
return (
<BackendAPIProviderContext.Provider value={api}>
{children}
</BackendAPIProviderContext.Provider>
);
}
export function useBackendAPI(): AutoGPTServerAPI {
const context = React.useContext(BackendAPIProviderContext);
if (!context) {
throw new Error(
"useBackendAPI must be used within a BackendAPIProviderContext",
);
}
return context;
}

View File

@ -1,5 +1,7 @@
import AutoGPTServerAPI from "./client";
import { AutoGPTServerAPI } from "./client";
export default AutoGPTServerAPI;
export * from "./client";
export * from "./context";
export * from "./types";
export * from "./utils";

View File

@ -168,6 +168,7 @@ export type Node = {
position: { x: number; y: number };
[key: string]: any;
};
webhook_id?: string;
};
/* Mirror of backend/data/graph.py:Link */