Mentions légales du service

Skip to content
Snippets Groups Projects
Commit bd50dd26 authored by Damien Saucez's avatar Damien Saucez
Browse files

full endpoint / engine separation

parent c5cdb6c2
No related branches found
No related tags found
No related merge requests found
Showing
with 164 additions and 106 deletions
......@@ -29,3 +29,4 @@ typing_extensions==4.12.2
uvicorn==0.34.0
watchdog==4.0.2
zipp==3.21.0
python-dotenv==1.0.1
\ No newline at end of file
src/.env 0 → 100644
title"backend"
description="SLICES backend"
api_url="/apis/backend/v1alpha1"
database_url="sqlite+aiosqlite:///example_async.db"
redis_db="slices_db"
redis_server="host.docker.internal"
redis_server_port=6379
\ No newline at end of file
from backend.core.database import async_session
from backend.schemas.task import TaskStatus
from backend.api.api_v1 import deps
import backend.models as models
import backend.engines.cluster as engine
import backend.schemas as schemas
import backend.models as models
import backend.utils as utils
import backend.tasks as tasks
from backend.schemas.task import TaskStatus
from fastapi import APIRouter, Depends, Request
from starlette import status
from typing import Annotated
from uuid import UUID
cluster_router = APIRouter()
import logging
logger = logging.getLogger(__name__)
import backend.engines.cluster as engine
cluster_router = APIRouter()
# = Endpoints
@cluster_router.post(
......@@ -19,18 +24,17 @@ import backend.engines.cluster as engine
status_code=status.HTTP_202_ACCEPTED,
operation_id="create_cluster",
)
async def _post_cluster_id(cluster_info: schemas.ClusterSchema):
for node in cluster_info.nodes:
vm = node.vm
flavor = await engine.get_flavor(vm.flavor)
print (f"\tShould create VM {node.model_dump_json()} with flavor {flavor.model_dump_json()}")
create_cluster_task = await tasks.examples.add.kiq(1,2)
taskid = create_cluster_task.task_id
cluster = await engine.create_cluster(name=cluster_info.name, taskid=taskid, status="submitted")
print (f"{cluster.id} --> {cluster.task_id}")
async def _post_cluster_id(cluster_info: schemas.ClusterSchema, cluster_id: Annotated[UUID, Depends(utils.gen_id)],):
# Create entry in DB
cluster = models.Cluster(id=str(cluster_id), name=cluster_info.name, status="submitted")
async with async_session() as db:
db.add(cluster)
await db.commit()
# Actually create the cluster
_ = await tasks.cluster.create_cluster.kiq(cluster_id, cluster_info)
# Return current status
return TaskStatus(id=cluster.id, status=cluster.status)
@cluster_router.get(
......
......@@ -2,6 +2,8 @@
from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging.config
from pathlib import Path
import backend.models
from backend.core.database import create_tables, async_session
......@@ -11,12 +13,16 @@ from backend.exception import register_exception_handlers
from backend.core.config import settings
logging.config.fileConfig(Path(__file__).with_name("logging.conf"), disable_existing_loggers=False)
logger = logging.getLogger(__name__)
async def shutdown():
print("bye")
logger.debug("bye")
@asynccontextmanager
async def database_lifespan(app: FastAPI):
print ("create_tables")
logger.debug("create_tables")
await create_tables()
yield
# await close_database()
......
"""Core configuration."""
from pydantic_settings import BaseSettings
from pydantic import (
AnyUrl,
Field,
field_validator,
RedisDsn,
ValidationInfo
Field
)
from typing import Any
from dotenv import load_dotenv
class Settings(BaseSettings):
title: str = "backend"
......@@ -22,4 +18,5 @@ class Settings(BaseSettings):
redis_server: str = "host.docker.internal"
redis_server_port: int = Field(6379, description="Redis server port number (0-65535)", ge=0, le=65535)
load_dotenv()
settings = Settings()
\ No newline at end of file
"""Slices BI backend database."""
from backend.core.config import settings
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession
......
from backend.core.config import settings
from backend.models import *
from backend.core.database import async_session
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq import TaskiqScheduler, TaskiqEvents, TaskiqState
from taskiq import TaskiqResult, TaskiqMiddleware
from taskiq import TaskiqResult, TaskiqMiddleware, TaskiqMessage
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
from redis import ConnectionPool
from sqlalchemy import update
from typing import List
from typing import List, Any
TASK_BACKEND_URL= f"redis://{settings.redis_server}:{settings.redis_server_port}"
......@@ -30,14 +29,10 @@ def make_redis_pool() -> ConnectionPool:
class TaskManagementMiddleware(TaskiqMiddleware):
async def post_execute(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
message: TaskiqMessage,
result: TaskiqResult[Any],
) -> None:
db = async_session()
await db.execute(
update(Cluster).where(Cluster.task_id == message.task_id).values(status="completed")
)
await db.commit()
pass
redis_async_result = RedisAsyncResultBackend(
str(TASK_BACKEND_URL),
......
from backend.core.database import async_session
import backend.models as models
import backend.schemas as schemas
import backend.exceptions as exceptions
import backend.schemas as schemas
import backend.models as models
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
import logging
logger = logging.getLogger(__name__)
# = cluster
async def get_cluster(cluster_id: str) -> models.Cluster:
......@@ -14,28 +15,37 @@ async def get_cluster(cluster_id: str) -> models.Cluster:
result = await db.execute(
sa.select(models.Cluster).where(
models.Cluster.id == cluster_id
),
)
)
try:
return result.scalar_one()
except sa.exc.NoResultFound:
raise exceptions.BackendNotFoundException(f"Cluster: {cluster_id}")
raise exceptions.ClusterNotFoundException(f"Cluster: {cluster_id}")
async def create_cluster(name: str, taskid: str, status: str) -> models.Cluster:
new_cluster = models.Cluster(name=name, task_id=taskid)
async with async_session() as db:
db.add(new_cluster)
await db.commit()
return new_cluster
async def create_cluster(id: str, cluster_info: schemas.ClusterSchema):
logger.info(f"should create cluster {cluster_info}")
for node in cluster_info.nodes:
vm = node.vm
flavor = await get_flavor(vm.flavor)
logger.info(f"\tShould create VM {node.model_dump_json()} with flavor {flavor.model_dump_json()}")
await asyncio.sleep(15)
logger.info("cluster created")
# == flavor
async def get_flavor(name: str) -> schemas.FlavorSchema:
async with async_session() as db:
flavor = await db.get(models.Flavor, name)
if flavor is None:
result = await db.execute(
sa.select(models.Flavor).where(
models.Flavor.name == name
)
)
try:
flavor = result.scalar_one()
return schemas.FlavorSchema(name=flavor.name, cpu=flavor.cpu, memory=flavor.memory)
except sa.exc.NoResultFound:
raise exceptions.FlavorNotFoundException(f"Flavor: {name}")
return schemas.FlavorSchema(name=flavor.name, cpu=flavor.cpu, memory=flavor.memory)
async def create_flavor(flavor: schemas.FlavorSchema) -> models.Flavor:
new_flavor = models.Flavor(name=flavor.name, cpu=flavor.cpu, memory=flavor.memory)
......
from .cluster import ClusterException
from .cluster import ClusterException, ClusterNotFoundException
from .vm import FlavorNotFoundException
from .deps import BackendException, BackendNotFoundException
__all__ = ['ClusterException', 'FlavorNotFoundException', 'BackendException', 'BackendNotFoundException']
\ No newline at end of file
__all__ = ['ClusterException', 'ClusterNotFoundException', 'FlavorNotFoundException', 'BackendException', 'BackendNotFoundException']
\ No newline at end of file
......@@ -2,3 +2,6 @@ from .deps import *
class ClusterException(BackendException):
"""Exception with the cluster."""
class ClusterNotFoundException(BackendNotFoundException):
"""Exception with the cluster."""
[loggers]
keys=root
[handlers]
keys=consoleHandler,detailedConsoleHandler
[formatters]
keys=normalFormatter,detailedFormatter
[logger_root]
level=INFO
handlers=consoleHandler
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=normalFormatter
args=(sys.stdout,)
[handler_detailedConsoleHandler]
class=StreamHandler
level=DEBUG
formatter=detailedFormatter
args=(sys.stdout,)
[formatter_normalFormatter]
format=%(asctime)s loglevel=%(levelname)-6s logger=%(name)s AAA %(message)s
[formatter_detailedFormatter]
format=%(asctime)s loglevel=%(levelname)-6s logger=%(name)s XXX %(message)s
from .deps import *
import uuid
from sqlalchemy.orm import Mapped, mapped_column
import datetime
from sqlalchemy.orm import Mapped, mapped_column, relationship
import uuid
class Cluster(Base):
__tablename__ = 'clusters' # This will be the name of the table
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name: Mapped[str] = mapped_column(String, nullable=False)
task_id: Mapped[str] = mapped_column(String, nullable=False)
status: Mapped[str] = mapped_column(String, default="submitted")
creation_time = Column(DateTime, nullable=False, default=datetime.datetime.now(datetime.timezone.utc)) # Auto-filled with UTC time
......
from backend.core.database import Base
from sqlalchemy import Column, Integer, String, DateTime
\ No newline at end of file
from sqlalchemy import Column, Integer, String, DateTime
from backend.core.database import Base
\ No newline at end of file
from .deps import *
from sqlalchemy.orm import Mapped, mapped_column
import datetime
from sqlalchemy.orm import Mapped, mapped_column, relationship
import uuid
class Flavor(Base):
__tablename__ = 'flavors' # This will be the name of the table
name: Mapped[str] = mapped_column(String, nullable=False, primary_key=True)
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name: Mapped[str] = mapped_column(String, nullable=False, unique=True)
cpu = Column(Integer, nullable=False)
memory = Column(Integer, nullable=False)
creation_time = Column(DateTime, nullable=False, default=datetime.datetime.now(datetime.timezone.utc)) # Auto-filled with UTC time
def __repr__(self):
return f"<Flavor(name={self.name}, creation_time={self.creation_time})>"
\ No newline at end of file
return f"<Flavors(name={self.name}, cpu:{self.cpu}, memory:'{self.memory}', creation_time={self.creation_time})>"
\ No newline at end of file
"""Background Tasks."""
from .examples import expire_compute_resources, add
from .cluster import create_cluster
__all__ = ['expire_compute_resources', 'add']
\ No newline at end of file
__all__ = ['expire_compute_resources', 'add', 'create_cluster']
\ No newline at end of file
from backend.core.database import async_session
import backend.engines.cluster as engine
import backend.exceptions as exceptions
import backend.schemas as schemas
import backend.models as models
from backend.core.taskiq import broker
from taskiq import Context, TaskiqDepends
from typing import Annotated
from sqlalchemy import update
import logging
logger = logging.getLogger(__name__)
def get_task_id(context: Annotated[Context, TaskiqDepends()]) -> str:
return context.message.task_id
@broker.task()
async def create_cluster(
id: str,
cluster_info: schemas.ClusterSchema,
task_id: str = TaskiqDepends(get_task_id)
) -> int:
# Create the cluster
try:
await engine.create_cluster(id=id, cluster_info=cluster_info)
status = "completed"
# ... failed to create it
except exceptions.BackendException:
status = "failed"
# ... commmit new state
finally:
db = async_session()
await db.execute(
update(models.Cluster).where(models.Cluster.id == id).values(status=status)
)
await db.commit()
\ No newline at end of file
from backend.core.taskiq import broker
import asyncio
import logging
logger = logging.getLogger(__name__)
@broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def expire_compute_resources():
print ("Exprire")
logger.info ("Exprire")
@broker.task()
async def add(x: int, y: int) -> int:
......
from uuid import UUID, uuid4
async def gen_id() -> UUID:
return uuid4()
\ No newline at end of file
import asyncio
from backend.tasks import add
from backend.core.database import create_tables, async_session
import backend.models
from backend.models import User
async def insert_user():
async with async_session() as session:
new_user = User(name="John Doe", age=30)
session.add(new_user)
await session.commit()
from sqlalchemy import text
async def get_users():
async with async_session() as session:
result = await session.execute(text('SELECT * FROM users'))
users = result.fetchall()
print("Users in the database:")
for user in users:
print(user)
async def main():
await create_tables()
await insert_user()
task = await add.kiq(1,2)
# Wait for the result.
result = await task.wait_result(timeout=60)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
asyncio.run(main())
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment