feat(backend/executor): Add `TERMINATED` execution status (#9185)
- Resolves #9182 Formerly known as `FAILED` with error message `TERMINATED`. ### Changes 🏗️ - Add `TERMINATED` to `AgentExecutionStatus` enum in DB schema (and its mirror in the front end) - Update executor to give terminated node and graph executions status `TERMINATED` instead of `FAILED`/`COMPLETED` - Add `TERMINATED` case to status checks referencing `AgentExecutionStatus` ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - Start and forcefully stop a graph execution --------- Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
parent
081c4a6df2
commit
c3caa111e4
|
@ -76,7 +76,11 @@ class AgentExecutorBlock(Block):
|
||||||
)
|
)
|
||||||
|
|
||||||
if not event.node_id:
|
if not event.node_id:
|
||||||
if event.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]:
|
if event.status in [
|
||||||
|
ExecutionStatus.COMPLETED,
|
||||||
|
ExecutionStatus.TERMINATED,
|
||||||
|
ExecutionStatus.FAILED,
|
||||||
|
]:
|
||||||
logger.info(f"Execution {log_id} ended with status {event.status}")
|
logger.info(f"Execution {log_id} ended with status {event.status}")
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -270,9 +270,9 @@ async def update_graph_execution_start_time(graph_exec_id: str):
|
||||||
|
|
||||||
async def update_graph_execution_stats(
|
async def update_graph_execution_stats(
|
||||||
graph_exec_id: str,
|
graph_exec_id: str,
|
||||||
|
status: ExecutionStatus,
|
||||||
stats: dict[str, Any],
|
stats: dict[str, Any],
|
||||||
) -> ExecutionResult:
|
) -> ExecutionResult:
|
||||||
status = ExecutionStatus.FAILED if stats.get("error") else ExecutionStatus.COMPLETED
|
|
||||||
res = await AgentGraphExecution.prisma().update(
|
res = await AgentGraphExecution.prisma().update(
|
||||||
where={"id": graph_exec_id},
|
where={"id": graph_exec_id},
|
||||||
data={
|
data={
|
||||||
|
|
|
@ -597,7 +597,7 @@ class Executor:
|
||||||
node_eid="*",
|
node_eid="*",
|
||||||
block_name="-",
|
block_name="-",
|
||||||
)
|
)
|
||||||
timing_info, (exec_stats, error) = cls._on_graph_execution(
|
timing_info, (exec_stats, status, error) = cls._on_graph_execution(
|
||||||
graph_exec, cancel, log_metadata
|
graph_exec, cancel, log_metadata
|
||||||
)
|
)
|
||||||
exec_stats["walltime"] = timing_info.wall_time
|
exec_stats["walltime"] = timing_info.wall_time
|
||||||
|
@ -605,6 +605,7 @@ class Executor:
|
||||||
exec_stats["error"] = str(error) if error else None
|
exec_stats["error"] = str(error) if error else None
|
||||||
result = cls.db_client.update_graph_execution_stats(
|
result = cls.db_client.update_graph_execution_stats(
|
||||||
graph_exec_id=graph_exec.graph_exec_id,
|
graph_exec_id=graph_exec.graph_exec_id,
|
||||||
|
status=status,
|
||||||
stats=exec_stats,
|
stats=exec_stats,
|
||||||
)
|
)
|
||||||
cls.db_client.send_execution_update(result)
|
cls.db_client.send_execution_update(result)
|
||||||
|
@ -616,11 +617,12 @@ class Executor:
|
||||||
graph_exec: GraphExecutionEntry,
|
graph_exec: GraphExecutionEntry,
|
||||||
cancel: threading.Event,
|
cancel: threading.Event,
|
||||||
log_metadata: LogMetadata,
|
log_metadata: LogMetadata,
|
||||||
) -> tuple[dict[str, Any], Exception | None]:
|
) -> tuple[dict[str, Any], ExecutionStatus, Exception | None]:
|
||||||
"""
|
"""
|
||||||
Returns:
|
Returns:
|
||||||
The execution statistics of the graph execution.
|
dict: The execution statistics of the graph execution.
|
||||||
The error that occurred during the execution.
|
ExecutionStatus: The final status of the graph execution.
|
||||||
|
Exception | None: The error that occurred during the execution, if any.
|
||||||
"""
|
"""
|
||||||
log_metadata.info(f"Start graph execution {graph_exec.graph_exec_id}")
|
log_metadata.info(f"Start graph execution {graph_exec.graph_exec_id}")
|
||||||
exec_stats = {
|
exec_stats = {
|
||||||
|
@ -665,8 +667,7 @@ class Executor:
|
||||||
|
|
||||||
while not queue.empty():
|
while not queue.empty():
|
||||||
if cancel.is_set():
|
if cancel.is_set():
|
||||||
error = RuntimeError("Execution is cancelled")
|
return exec_stats, ExecutionStatus.TERMINATED, error
|
||||||
return exec_stats, error
|
|
||||||
|
|
||||||
exec_data = queue.get()
|
exec_data = queue.get()
|
||||||
|
|
||||||
|
@ -696,8 +697,7 @@ class Executor:
|
||||||
)
|
)
|
||||||
for node_id, execution in list(running_executions.items()):
|
for node_id, execution in list(running_executions.items()):
|
||||||
if cancel.is_set():
|
if cancel.is_set():
|
||||||
error = RuntimeError("Execution is cancelled")
|
return exec_stats, ExecutionStatus.TERMINATED, error
|
||||||
return exec_stats, error
|
|
||||||
|
|
||||||
if not queue.empty():
|
if not queue.empty():
|
||||||
break # yield to parent loop to execute new queue items
|
break # yield to parent loop to execute new queue items
|
||||||
|
@ -716,7 +716,12 @@ class Executor:
|
||||||
finished = True
|
finished = True
|
||||||
cancel.set()
|
cancel.set()
|
||||||
cancel_thread.join()
|
cancel_thread.join()
|
||||||
return exec_stats, error
|
|
||||||
|
return (
|
||||||
|
exec_stats,
|
||||||
|
ExecutionStatus.FAILED if error else ExecutionStatus.COMPLETED,
|
||||||
|
error,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ExecutionManager(AppService):
|
class ExecutionManager(AppService):
|
||||||
|
@ -882,11 +887,8 @@ class ExecutionManager(AppService):
|
||||||
ExecutionStatus.COMPLETED,
|
ExecutionStatus.COMPLETED,
|
||||||
ExecutionStatus.FAILED,
|
ExecutionStatus.FAILED,
|
||||||
):
|
):
|
||||||
self.db_client.upsert_execution_output(
|
|
||||||
node_exec.node_exec_id, "error", "TERMINATED"
|
|
||||||
)
|
|
||||||
exec_update = self.db_client.update_execution_status(
|
exec_update = self.db_client.update_execution_status(
|
||||||
node_exec.node_exec_id, ExecutionStatus.FAILED
|
node_exec.node_exec_id, ExecutionStatus.TERMINATED
|
||||||
)
|
)
|
||||||
self.db_client.send_execution_update(exec_update)
|
self.db_client.send_execution_update(exec_update)
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,9 @@ async def wait_execution(
|
||||||
if status == ExecutionStatus.FAILED:
|
if status == ExecutionStatus.FAILED:
|
||||||
log.info("Execution failed")
|
log.info("Execution failed")
|
||||||
raise Exception("Execution failed")
|
raise Exception("Execution failed")
|
||||||
|
if status == ExecutionStatus.TERMINATED:
|
||||||
|
log.info("Execution terminated")
|
||||||
|
raise Exception("Execution terminated")
|
||||||
return status == ExecutionStatus.COMPLETED
|
return status == ExecutionStatus.COMPLETED
|
||||||
|
|
||||||
# Wait for the executions to complete
|
# Wait for the executions to complete
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
-- Add "TERMINATED" to execution status enum type
|
||||||
|
ALTER TYPE "AgentExecutionStatus" ADD VALUE 'TERMINATED';
|
|
@ -216,6 +216,7 @@ enum AgentExecutionStatus {
|
||||||
QUEUED
|
QUEUED
|
||||||
RUNNING
|
RUNNING
|
||||||
COMPLETED
|
COMPLETED
|
||||||
|
TERMINATED
|
||||||
FAILED
|
FAILED
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -638,4 +639,4 @@ enum APIKeyStatus {
|
||||||
ACTIVE
|
ACTIVE
|
||||||
REVOKED
|
REVOKED
|
||||||
SUSPENDED
|
SUSPENDED
|
||||||
}
|
}
|
||||||
|
|
|
@ -848,8 +848,10 @@ export function CustomNode({
|
||||||
data.status === "COMPLETED",
|
data.status === "COMPLETED",
|
||||||
"border-yellow-600 bg-yellow-600 text-white":
|
"border-yellow-600 bg-yellow-600 text-white":
|
||||||
data.status === "RUNNING",
|
data.status === "RUNNING",
|
||||||
"border-red-600 bg-red-600 text-white":
|
"border-red-600 bg-red-600 text-white": [
|
||||||
data.status === "FAILED",
|
"FAILED",
|
||||||
|
"TERMINATED",
|
||||||
|
].includes(data.status || ""),
|
||||||
"border-blue-600 bg-blue-600 text-white":
|
"border-blue-600 bg-blue-600 text-white":
|
||||||
data.status === "QUEUED",
|
data.status === "QUEUED",
|
||||||
"border-gray-600 bg-gray-600 font-black":
|
"border-gray-600 bg-gray-600 font-black":
|
||||||
|
|
|
@ -558,8 +558,9 @@ export default function useAgentGraph(
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (
|
if (
|
||||||
nodeResult.status != "COMPLETED" &&
|
!["COMPLETED", "TERMINATED", "FAILED"].includes(
|
||||||
nodeResult.status != "FAILED"
|
nodeResult.status,
|
||||||
|
)
|
||||||
) {
|
) {
|
||||||
pendingNodeExecutions.add(nodeResult.node_exec_id);
|
pendingNodeExecutions.add(nodeResult.node_exec_id);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -196,7 +196,7 @@ export type GraphExecution = {
|
||||||
ended_at: number;
|
ended_at: number;
|
||||||
duration: number;
|
duration: number;
|
||||||
total_run_time: number;
|
total_run_time: number;
|
||||||
status: "INCOMPLETE" | "QUEUED" | "RUNNING" | "COMPLETED" | "FAILED";
|
status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED";
|
||||||
graph_id: string;
|
graph_id: string;
|
||||||
graph_version: number;
|
graph_version: number;
|
||||||
};
|
};
|
||||||
|
@ -246,7 +246,13 @@ export type NodeExecutionResult = {
|
||||||
node_exec_id: string;
|
node_exec_id: string;
|
||||||
node_id: string;
|
node_id: string;
|
||||||
block_id: string;
|
block_id: string;
|
||||||
status: "INCOMPLETE" | "QUEUED" | "RUNNING" | "COMPLETED" | "FAILED";
|
status:
|
||||||
|
| "INCOMPLETE"
|
||||||
|
| "QUEUED"
|
||||||
|
| "RUNNING"
|
||||||
|
| "COMPLETED"
|
||||||
|
| "TERMINATED"
|
||||||
|
| "FAILED";
|
||||||
input_data: { [key: string]: any };
|
input_data: { [key: string]: any };
|
||||||
output_data: { [key: string]: Array<any> };
|
output_data: { [key: string]: Array<any> };
|
||||||
add_time: Date;
|
add_time: Date;
|
||||||
|
|
Loading…
Reference in New Issue