feat(rnd): Split Execution Manager (#8008)

* split execution manager and removed ns and use direct uri with k8s and docker specific dns

* formating

* split execution manager

* refactor(builder): Fix linting warning and errors (#8021)

* Fix lint errors

* Fix dependency loop

* address feedback

* docker compose

* remove ns entirely

* remove yarn lock changes

* update readme

* remove ref

* dockerfile and log

* update log

* debug

* rename to executor

* remove execution from rest

* exec.py

* linting

* udpate tests to use config

* fix test

---------

Co-authored-by: Krzysztof Czerwinski <34861343+kcze@users.noreply.github.com>
This commit is contained in:
Aarushi 2024-09-10 10:05:31 +01:00 committed by GitHub
parent ef691359b7
commit 0b919522ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 113 additions and 43 deletions

View File

@ -36,4 +36,3 @@ rnd/autogpt_builder/.env.example
rnd/autogpt_builder/.env.local
rnd/autogpt_server/.env
rnd/autogpt_server/.venv/

View File

@ -11,6 +11,7 @@ REDIS_PASSWORD=password
AUTH_ENABLED=false
APP_ENV="local"
PYRO_HOST=localhost
SENTRY_DSN=
## ===== OPTIONAL API KEYS ===== ##

View File

@ -4,7 +4,6 @@ FROM python:3.11-slim-buster as server_base
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
WORKDIR /app
RUN apt-get update \
@ -17,7 +16,6 @@ RUN apt-get update \
&& make prefix=/usr all \
&& make prefix=/usr install
ENV POETRY_VERSION=1.8.3 \
POETRY_HOME="/opt/poetry" \
POETRY_NO_INTERACTION=1 \
@ -44,3 +42,10 @@ ENV PORT=8000
ENV DATABASE_URL=""
CMD ["poetry", "run", "rest"]
FROM server_base as executor
ENV PORT=8002
ENV DATABASE_URL=""
CMD ["poetry", "run", "executor"]

View File

@ -183,6 +183,13 @@ A communication layer (`service.py`) is created to decouple the communication li
Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.
By default the daemons run on the following ports:
Execution Manager Daemon: 8002
Execution Scheduler Daemon: 8003
Rest Server Daemon: 8004
## Adding a New Agent Block
To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:

View File

@ -26,10 +26,8 @@ def main(**kwargs):
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer, WebsocketServer
from autogpt_server.util.service import PyroNameServer
run_processes(
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),

View File

@ -0,0 +1,15 @@
from autogpt_server.app import run_processes
from autogpt_server.executor import ExecutionManager
def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
ExecutionManager(),
)
if __name__ == "__main__":
main()

View File

@ -364,7 +364,7 @@ def validate_exec(
def get_agent_server_client() -> "AgentServer":
from autogpt_server.server.rest_api import AgentServer
return get_service_client(AgentServer)
return get_service_client(AgentServer, Config().agent_server_port)
class Executor:
@ -648,6 +648,7 @@ class Executor:
class ExecutionManager(AppService):
def __init__(self):
super().__init__(port=Config().execution_manager_port)
self.use_db = True
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()

View File

@ -9,6 +9,7 @@ from autogpt_server.data import schedule as model
from autogpt_server.data.block import BlockInput
from autogpt_server.executor.manager import ExecutionManager
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
logger = logging.getLogger(__name__)
@ -19,13 +20,15 @@ def log(msg, **kwargs):
class ExecutionScheduler(AppService):
def __init__(self, refresh_interval=10):
super().__init__(port=Config().execution_scheduler_port)
self.use_db = True
self.last_check = datetime.min
self.refresh_interval = refresh_interval
self.use_redis = False
@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
return get_service_client(ExecutionManager, Config().execution_manager_port)
def run_service(self):
scheduler = BackgroundScheduler()

View File

@ -1,7 +1,6 @@
from autogpt_server.app import run_processes
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.executor import ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.service import PyroNameServer
def main():
@ -9,8 +8,6 @@ def main():
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
)

View File

@ -21,7 +21,7 @@ from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.model import CreateGraph, SetGraphActiveVersion
from autogpt_server.util.lock import KeyedMutex
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
from autogpt_server.util.settings import Config, Settings
from .utils import get_user_id
@ -34,6 +34,7 @@ class AgentServer(AppService):
_test_dependency_overrides = {}
def __init__(self, event_queue: AsyncEventQueue | None = None):
super().__init__(port=Config().agent_server_port)
self.event_queue = event_queue or AsyncRedisEventQueue()
@asynccontextmanager
@ -239,11 +240,11 @@ class AgentServer(AppService):
@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
return get_service_client(ExecutionManager, Config().execution_manager_port)
@property
def execution_scheduler_client(self) -> ExecutionScheduler:
return get_service_client(ExecutionScheduler)
return get_service_client(ExecutionScheduler, Config().execution_scheduler_port)
@classmethod
def handle_internal_http_error(cls, request: Request, exc: Exception):

View File

@ -1,12 +1,13 @@
import asyncio
import logging
import os
import threading
import time
from abc import abstractmethod
from typing import Any, Callable, Coroutine, Type, TypeVar, cast
import Pyro5.api
from Pyro5 import api as pyro
from Pyro5 import nameserver
from autogpt_server.data import db
from autogpt_server.data.queue import AsyncEventQueue, AsyncRedisEventQueue
@ -42,25 +43,16 @@ def expose(func: C) -> C:
return pyro.expose(wrapper) # type: ignore
class PyroNameServer(AppProcess):
def run(self):
nameserver.start_ns_loop(host=pyro_host, port=9090)
@conn_retry
def _wait_for_ns(self):
pyro.locate_ns(host="localhost", port=9090)
def health_check(self):
self._wait_for_ns()
logger.info(f"{__class__.__name__} is ready")
class AppService(AppProcess):
shared_event_loop: asyncio.AbstractEventLoop
event_queue: AsyncEventQueue = AsyncRedisEventQueue()
use_db: bool = False
use_redis: bool = False
def __init__(self, port):
self.port = port
self.uri = None
@classmethod
@property
def service_name(cls) -> str:
@ -108,11 +100,10 @@ class AppService(AppProcess):
@conn_retry
def __start_pyro(self):
daemon = pyro.Daemon(host=pyro_host)
ns = pyro.locate_ns(host=pyro_host, port=9090)
uri = daemon.register(self)
ns.register(self.service_name, uri)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {uri}")
host = Config().pyro_host
daemon = Pyro5.api.Daemon(host=host, port=self.port)
self.uri = daemon.register(self, objectId=self.service_name)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}")
daemon.requestLoop()
def __start_async_loop(self):
@ -122,16 +113,19 @@ class AppService(AppProcess):
AS = TypeVar("AS", bound=AppService)
def get_service_client(service_type: Type[AS]) -> AS:
def get_service_client(service_type: Type[AS], port: int) -> AS:
service_name = service_type.service_name
class DynamicClient:
@conn_retry
def __init__(self):
ns = pyro.locate_ns()
uri = ns.lookup(service_name)
self.proxy = pyro.Proxy(uri)
host = os.environ.get(f"{service_name.upper()}_HOST", "localhost")
uri = f"PYRO:{service_type.service_name}@{host}:{port}"
logger.debug(f"Connecting to service [{service_name}]. URI = {uri}")
self.proxy = Pyro5.api.Proxy(uri)
# Attempt to bind to ensure the connection is established
self.proxy._pyroBind()
logger.debug(f"Successfully connected to service [{service_name}]")
def __getattr__(self, name: str) -> Callable[..., Any]:
return getattr(self.proxy, name)

View File

@ -72,6 +72,21 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
extra="allow",
)
execution_manager_port: int = Field(
default=8002,
description="The port for execution manager daemon to run on",
)
execution_scheduler_port: int = Field(
default=8003,
description="The port for execution scheduler daemon to run on",
)
agent_server_port: int = Field(
default=8004,
description="The port for agent server daemon to run on",
)
@classmethod
def settings_customise_sources(
cls,

View File

@ -8,7 +8,6 @@ from autogpt_server.data.queue import AsyncEventQueue
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.server.rest_api import get_user_id
from autogpt_server.util.service import PyroNameServer
log = print
@ -48,7 +47,6 @@ class InMemoryAsyncEventQueue(AsyncEventQueue):
class SpinTestServer:
def __init__(self):
self.name_server = PyroNameServer()
self.exec_manager = ExecutionManager()
self.in_memory_queue = InMemoryAsyncEventQueue()
self.agent_server = AgentServer(event_queue=self.in_memory_queue)
@ -59,7 +57,6 @@ class SpinTestServer:
return "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
async def __aenter__(self):
self.name_server.__enter__()
self.setup_dependency_overrides()
self.agent_server.__enter__()
self.exec_manager.__enter__()
@ -76,7 +73,6 @@ class SpinTestServer:
self.scheduler.__exit__(exc_type, exc_val, exc_tb)
self.exec_manager.__exit__(exc_type, exc_val, exc_tb)
self.agent_server.__exit__(exc_type, exc_val, exc_tb)
self.name_server.__exit__(exc_type, exc_val, exc_tb)
def setup_dependency_overrides(self):
# Override get_user_id for testing

View File

@ -66,6 +66,7 @@ build-backend = "poetry.core.masonry.api"
app = "autogpt_server.app:main"
rest = "autogpt_server.rest:main"
ws = "autogpt_server.ws:main"
executor = "autogpt_server.exec:main"
cli = "autogpt_server.cli:main"
format = "linter:format"
lint = "linter:lint"

View File

@ -4,6 +4,7 @@ from autogpt_server.data import db, graph
from autogpt_server.executor import ExecutionScheduler
from autogpt_server.usecases.sample import create_test_graph, create_test_user
from autogpt_server.util.service import get_service_client
from autogpt_server.util.settings import Config
from autogpt_server.util.test import SpinTestServer
@ -13,7 +14,9 @@ async def test_agent_schedule(server: SpinTestServer):
test_user = await create_test_user()
test_graph = await graph.create_graph(create_test_graph(), user_id=test_user.id)
scheduler = get_service_client(ExecutionScheduler)
scheduler = get_service_client(
ExecutionScheduler, Config().execution_scheduler_port
)
schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id)
assert len(schedules) == 0

View File

@ -5,6 +5,7 @@ from autogpt_server.util.service import AppService, expose, get_service_client
class TestService(AppService):
def __init__(self):
super().__init__(port=8005)
self.use_redis = False
def run_service(self):
@ -29,7 +30,7 @@ class TestService(AppService):
@pytest.mark.asyncio(scope="session")
async def test_service_creation(server):
with TestService():
client = get_service_client(TestService)
client = get_service_client(TestService, 8005)
assert client.add(5, 3) == 8
assert client.subtract(10, 4) == 6
assert client.fun_with_async(5, 3) == 8

View File

@ -28,6 +28,7 @@ services:
build:
context: ../
dockerfile: rnd/autogpt_server/Dockerfile
target: server
develop:
watch:
- path: ./
@ -44,11 +45,42 @@ services:
- REDIS_PORT=6379
- REDIS_PASSWORD=password
- AUTH_ENABLED=false
- PYRO_HOST=0.0.0.0
- EXECUTIONMANAGER_HOST=executor
- EXECUTIONSCHEDULER_HOST=execution_scheduler
ports:
- "8000:8000"
networks:
- app-network
executor:
build:
context: ../
dockerfile: rnd/autogpt_server/Dockerfile
target: executor
develop:
watch:
- path: ./
target: rnd/autogpt_server/
action: rebuild
depends_on:
redis:
condition: service_started
postgres:
condition: service_healthy
environment:
- DATABASE_URL=postgresql://agpt_user:pass123@postgres:5432/agpt_local
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_PASSWORD=password
- AUTH_ENABLED=false
- PYRO_HOST=0.0.0.0
- AGENTSERVER_HOST=rest_server
ports:
- "8002:8002"
networks:
- app-network
ws_server:
build:
context: ../
@ -67,6 +99,7 @@ services:
- REDIS_PORT=6379
- REDIS_PASSWORD=password
- AUTH_ENABLED=false
- PYRO_HOST=0.0.0.0
ports:
- "8001:8001"
networks: