Skip to content

Commit 1e87240

Browse files
authored
feat(platform): Introduced Agent Execution Block (#8533)
1 parent 5ee909f commit 1e87240

File tree

16 files changed

+322
-155
lines changed

16 files changed

+322
-155
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import logging
2+
3+
from autogpt_libs.utils.cache import thread_cached
4+
5+
from backend.data.block import (
6+
Block,
7+
BlockCategory,
8+
BlockInput,
9+
BlockOutput,
10+
BlockSchema,
11+
BlockType,
12+
get_block,
13+
)
14+
from backend.data.execution import ExecutionStatus
15+
from backend.data.model import SchemaField
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
@thread_cached
21+
def get_executor_manager_client():
22+
from backend.executor import ExecutionManager
23+
from backend.util.service import get_service_client
24+
25+
return get_service_client(ExecutionManager)
26+
27+
28+
@thread_cached
29+
def get_event_bus():
30+
from backend.data.queue import RedisExecutionEventBus
31+
32+
return RedisExecutionEventBus()
33+
34+
35+
class AgentExecutorBlock(Block):
36+
class Input(BlockSchema):
37+
user_id: str = SchemaField(description="User ID")
38+
graph_id: str = SchemaField(description="Graph ID")
39+
graph_version: int = SchemaField(description="Graph Version")
40+
41+
data: BlockInput = SchemaField(description="Input data for the graph")
42+
input_schema: dict = SchemaField(description="Input schema for the graph")
43+
output_schema: dict = SchemaField(description="Output schema for the graph")
44+
45+
class Output(BlockSchema):
46+
pass
47+
48+
def __init__(self):
49+
super().__init__(
50+
id="e189baac-8c20-45a1-94a7-55177ea42565",
51+
description="Executes an existing agent inside your agent",
52+
input_schema=AgentExecutorBlock.Input,
53+
output_schema=AgentExecutorBlock.Output,
54+
block_type=BlockType.AGENT,
55+
categories={BlockCategory.AGENT},
56+
)
57+
58+
def run(self, input_data: Input, **kwargs) -> BlockOutput:
59+
executor_manager = get_executor_manager_client()
60+
event_bus = get_event_bus()
61+
62+
graph_exec = executor_manager.add_execution(
63+
graph_id=input_data.graph_id,
64+
graph_version=input_data.graph_version,
65+
user_id=input_data.user_id,
66+
data=input_data.data,
67+
)
68+
log_id = f"Graph #{input_data.graph_id}-V{input_data.graph_version}, exec-id: {graph_exec.graph_exec_id}"
69+
logger.info(f"Starting execution of {log_id}")
70+
71+
for event in event_bus.listen(
72+
graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id
73+
):
74+
logger.info(
75+
f"Execution {log_id} produced input {event.input_data} output {event.output_data}"
76+
)
77+
78+
if not event.node_id:
79+
if event.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]:
80+
logger.info(f"Execution {log_id} ended with status {event.status}")
81+
break
82+
else:
83+
continue
84+
85+
if not event.block_id:
86+
logger.warning(f"{log_id} received event without block_id {event}")
87+
continue
88+
89+
block = get_block(event.block_id)
90+
if not block or block.block_type != BlockType.OUTPUT:
91+
continue
92+
93+
output_name = event.input_data.get("name")
94+
if not output_name:
95+
logger.warning(f"{log_id} produced an output with no name {event}")
96+
continue
97+
98+
for output_data in event.output_data.get("output", []):
99+
logger.info(f"Execution {log_id} produced {output_name}: {output_data}")
100+
yield output_name, output_data

autogpt_platform/backend/backend/blocks/basic.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ class Input(BlockSchema):
233233
)
234234
name: str = SchemaField(description="The name of the output.")
235235
title: str | None = SchemaField(
236-
description="The title of the input.", default=None, advanced=True
236+
description="The title of the output.",
237+
default=None,
238+
advanced=True,
237239
)
238240
description: str | None = SchemaField(
239241
description="The description of the output.",
@@ -262,7 +264,7 @@ class Output(BlockSchema):
262264
def __init__(self):
263265
super().__init__(
264266
id="363ae599-353e-4804-937e-b2ee3cef3da4",
265-
description=("Stores the output of the graph for users to see."),
267+
description="Stores the output of the graph for users to see.",
266268
input_schema=AgentOutputBlock.Input,
267269
output_schema=AgentOutputBlock.Output,
268270
test_input=[

autogpt_platform/backend/backend/data/block.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class BlockType(Enum):
3434
INPUT = "Input"
3535
OUTPUT = "Output"
3636
NOTE = "Note"
37+
AGENT = "Agent"
3738

3839

3940
class BlockCategory(Enum):
@@ -48,6 +49,7 @@ class BlockCategory(Enum):
4849
COMMUNICATION = "Block that interacts with communication platforms."
4950
DEVELOPER_TOOLS = "Developer tools such as GitHub blocks."
5051
DATA = "Block that interacts with structured data."
52+
AGENT = "Block that interacts with other agents."
5153

5254
def dict(self) -> dict[str, str]:
5355
return {"category": self.name, "description": self.value}
@@ -299,7 +301,9 @@ def execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
299301
):
300302
if output_name == "error":
301303
raise RuntimeError(output_data)
302-
if error := self.output_schema.validate_field(output_name, output_data):
304+
if self.block_type == BlockType.STANDARD and (
305+
error := self.output_schema.validate_field(output_name, output_data)
306+
):
303307
raise ValueError(f"Block produced an invalid output data: {error}")
304308
yield output_name, output_data
305309

autogpt_platform/backend/backend/data/execution.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ExecutionResult(BaseModel):
6464
graph_exec_id: str
6565
node_exec_id: str
6666
node_id: str
67+
block_id: str
6768
status: ExecutionStatus
6869
input_data: BlockInput
6970
output_data: CompletedBlockOutput
@@ -72,6 +73,26 @@ class ExecutionResult(BaseModel):
7273
start_time: datetime | None
7374
end_time: datetime | None
7475

76+
@staticmethod
77+
def from_graph(graph: AgentGraphExecution):
78+
return ExecutionResult(
79+
graph_id=graph.agentGraphId,
80+
graph_version=graph.agentGraphVersion,
81+
graph_exec_id=graph.id,
82+
node_exec_id="",
83+
node_id="",
84+
block_id="",
85+
status=graph.executionStatus,
86+
# TODO: Populate input_data & output_data from AgentNodeExecutions
87+
# Input & Output comes AgentInputBlock & AgentOutputBlock.
88+
input_data={},
89+
output_data={},
90+
add_time=graph.createdAt,
91+
queue_time=graph.createdAt,
92+
start_time=graph.startedAt,
93+
end_time=graph.updatedAt,
94+
)
95+
7596
@staticmethod
7697
def from_db(execution: AgentNodeExecution):
7798
if execution.executionData:
@@ -93,9 +114,10 @@ def from_db(execution: AgentNodeExecution):
93114
graph_id=graph_execution.agentGraphId if graph_execution else "",
94115
graph_version=graph_execution.agentGraphVersion if graph_execution else 0,
95116
graph_exec_id=execution.agentGraphExecutionId,
117+
block_id=execution.AgentNode.agentBlockId if execution.AgentNode else "",
96118
node_exec_id=execution.id,
97119
node_id=execution.agentNodeId,
98-
status=ExecutionStatus(execution.executionStatus),
120+
status=execution.executionStatus,
99121
input_data=input_data,
100122
output_data=output_data,
101123
add_time=execution.addedTime,
@@ -248,15 +270,20 @@ async def update_graph_execution_start_time(graph_exec_id: str):
248270
async def update_graph_execution_stats(
249271
graph_exec_id: str,
250272
stats: dict[str, Any],
251-
):
273+
) -> ExecutionResult:
274+
252275
status = ExecutionStatus.FAILED if stats.get("error") else ExecutionStatus.COMPLETED
253-
await AgentGraphExecution.prisma().update(
276+
res = await AgentGraphExecution.prisma().update(
254277
where={"id": graph_exec_id},
255278
data={
256279
"executionStatus": status,
257280
"stats": json.dumps(stats),
258281
},
259282
)
283+
if not res:
284+
raise ValueError(f"Execution {graph_exec_id} not found.")
285+
286+
return ExecutionResult.from_graph(res)
260287

261288

262289
async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):

autogpt_platform/backend/backend/data/graph.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from prisma.types import AgentGraphWhereInput
1010
from pydantic.fields import computed_field
1111

12+
from backend.blocks.agent import AgentExecutorBlock
1213
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
1314
from backend.data.block import BlockInput, BlockType, get_block, get_blocks
1415
from backend.data.db import BaseDbModel, transaction
@@ -174,24 +175,35 @@ def starting_nodes(self) -> list[Node]:
174175
if node.id not in outbound_nodes or node.id in input_nodes
175176
]
176177

177-
def reassign_ids(self, reassign_graph_id: bool = False):
178+
def reassign_ids(self, user_id: str, reassign_graph_id: bool = False):
178179
"""
179180
Reassigns all IDs in the graph to new UUIDs.
180181
This method can be used before storing a new graph to the database.
181182
"""
182-
self.validate_graph()
183183

184+
# Reassign Graph ID
184185
id_map = {node.id: str(uuid.uuid4()) for node in self.nodes}
185186
if reassign_graph_id:
186187
self.id = str(uuid.uuid4())
187188

189+
# Reassign Node IDs
188190
for node in self.nodes:
189191
node.id = id_map[node.id]
190192

193+
# Reassign Link IDs
191194
for link in self.links:
192195
link.source_id = id_map[link.source_id]
193196
link.sink_id = id_map[link.sink_id]
194197

198+
# Reassign User IDs for agent blocks
199+
for node in self.nodes:
200+
if node.block_id != AgentExecutorBlock().id:
201+
continue
202+
node.input_default["user_id"] = user_id
203+
node.input_default.setdefault("data", {})
204+
205+
self.validate_graph()
206+
195207
def validate_graph(self, for_run: bool = False):
196208
def sanitize(name):
197209
return name.split("_#_")[0].split("_@_")[0].split("_$_")[0]
@@ -215,6 +227,7 @@ def sanitize(name):
215227
for_run # Skip input completion validation, unless when executing.
216228
or block.block_type == BlockType.INPUT
217229
or block.block_type == BlockType.OUTPUT
230+
or block.block_type == BlockType.AGENT
218231
):
219232
raise ValueError(
220233
f"Node {block.name} #{node.id} required input missing: `{name}`"
@@ -248,18 +261,26 @@ def is_static_output_block(nid: str) -> bool:
248261
)
249262

250263
sanitized_name = sanitize(name)
264+
vals = node.input_default
251265
if i == 0:
252-
fields = f"Valid output fields: {block.output_schema.get_fields()}"
266+
fields = (
267+
block.output_schema.get_fields()
268+
if block.block_type != BlockType.AGENT
269+
else vals.get("output_schema", {}).get("properties", {}).keys()
270+
)
253271
else:
254-
fields = f"Valid input fields: {block.input_schema.get_fields()}"
272+
fields = (
273+
block.input_schema.get_fields()
274+
if block.block_type != BlockType.AGENT
275+
else vals.get("input_schema", {}).get("properties", {}).keys()
276+
)
255277
if sanitized_name not in fields:
256-
raise ValueError(f"{suffix}, `{name}` invalid, {fields}")
278+
fields_msg = f"Allowed fields: {fields}"
279+
raise ValueError(f"{suffix}, `{name}` invalid, {fields_msg}")
257280

258281
if is_static_output_block(link.source_id):
259282
link.is_static = True # Each value block output should be static.
260283

261-
# TODO: Add type compatibility check here.
262-
263284
@staticmethod
264285
def from_db(graph: AgentGraph, hide_credentials: bool = False):
265286
executions = [

autogpt_platform/backend/backend/executor/database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def get_port(cls) -> int:
4141
return Config().database_api_port
4242

4343
@expose
44-
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
45-
self.event_queue.publish(ExecutionResult(**execution_result_dict))
44+
def send_execution_update(self, execution_result: ExecutionResult):
45+
self.event_queue.publish(execution_result)
4646

4747
@staticmethod
4848
def exposed_run_and_wait(

0 commit comments

Comments
 (0)