Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ fastpubsub_api_debug='true'
fastpubsub_api_host='127.0.0.1'
fastpubsub_api_port='8000'
fastpubsub_api_num_workers='1'

fastpubsub_auth_enabled='false'
fastpubsub_auth_secret_key='my-super-secret-key'
fastpubsub_auth_algorithm='HS256'
fastpubsub_auth_access_token_expire_minutes='30'
29 changes: 24 additions & 5 deletions fastpubsub/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
from fastpubsub import models
from fastpubsub.api.helpers import _create_error_response
from fastpubsub.api.middlewares import log_requests
from fastpubsub.api.routers import monitoring, subscriptions, topics
from fastpubsub.api.routers import clients, monitoring, subscriptions, topics
from fastpubsub.config import settings
from fastpubsub.exceptions import AlreadyExistsError, NotFoundError, ServiceUnavailable
from fastpubsub.exceptions import (
AlreadyExistsError,
InvalidClient,
InvalidClientToken,
NotFoundError,
ServiceUnavailable,
)

tags_metadata = [
{
Expand All @@ -22,6 +28,10 @@
"name": "monitoring",
"description": "Operations with monitoring.",
},
{
"name": "clients",
"description": "Operations with clients.",
},
]


Expand All @@ -39,20 +49,29 @@ def create_app() -> FastAPI:
# Add exception handlers
@app.exception_handler(AlreadyExistsError)
def already_exists_exception_handler(request: Request, exc: AlreadyExistsError):
return _create_error_response(models.AlreadyExists, status.HTTP_409_CONFLICT, exc)
return _create_error_response(models.GenericError, status.HTTP_409_CONFLICT, exc)

@app.exception_handler(NotFoundError)
def not_found_exception_handler(request: Request, exc: NotFoundError):
return _create_error_response(models.NotFound, status.HTTP_404_NOT_FOUND, exc)
return _create_error_response(models.GenericError, status.HTTP_404_NOT_FOUND, exc)

@app.exception_handler(ServiceUnavailable)
def service_unavailable_exception_handler(request: Request, exc: ServiceUnavailable):
return _create_error_response(models.ServiceUnavailable, status.HTTP_503_SERVICE_UNAVAILABLE, exc)
return _create_error_response(models.GenericError, status.HTTP_503_SERVICE_UNAVAILABLE, exc)

@app.exception_handler(InvalidClient)
def invalid_client_exception_handler(request: Request, exc: InvalidClient):
return _create_error_response(models.GenericError, status.HTTP_401_UNAUTHORIZED, exc)

@app.exception_handler(InvalidClientToken)
def invalid_client_token_exception_handler(request: Request, exc: InvalidClientToken):
return _create_error_response(models.GenericError, status.HTTP_403_FORBIDDEN, exc)

# Add routers
app.include_router(topics.router)
app.include_router(subscriptions.router)
app.include_router(monitoring.router)
app.include_router(clients.router)

# Add Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
Expand Down
88 changes: 88 additions & 0 deletions fastpubsub/api/routers/clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import uuid
from typing import Annotated

from fastapi import APIRouter, Depends, Query, status

from fastpubsub import models, services

router = APIRouter(tags=["clients"])


@router.post(
"/clients",
response_model=models.CreateClientResult,
status_code=status.HTTP_201_CREATED,
summary="Create a new client",
)
async def create_client(
data: models.CreateClient,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "create"))],
):
return await services.create_client(data)


@router.get(
"/clients/{id}",
response_model=models.Client,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.GenericError}},
summary="Get a client",
)
async def get_client(
id: uuid.UUID,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "read"))],
):
return await services.get_client(id)


@router.put(
"/clients/{id}",
response_model=models.Client,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.GenericError}},
summary="Update a client",
)
async def update_client(
id: uuid.UUID,
data: models.UpdateClient,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "update"))],
):
return await services.update_client(id, data)


@router.get(
"/clients",
response_model=models.ListClientAPI,
status_code=status.HTTP_200_OK,
summary="List clients",
)
async def list_client(
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "read"))],
offset: int = Query(default=0, ge=0),
limit: int = Query(default=10, ge=1, le=100),
):
clients = await services.list_client(offset, limit)
return models.ListClientAPI(data=clients)


@router.delete(
"/clients/{id}",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"model": models.GenericError}},
summary="Delete a client",
)
async def delete_client(
id: uuid.UUID,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "delete"))],
):
await services.delete_client(id)


@router.post(
"/oauth/token",
response_model=models.ClientToken,
status_code=status.HTTP_201_CREATED,
summary="Issue a new client token",
)
async def issue_client_token(data: models.IssueClientToken):
return await services.issue_jwt_client_token(client_id=data.client_id, client_secret=data.client_secret)
2 changes: 1 addition & 1 deletion fastpubsub/api/routers/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def liveness_probe():
"/readiness",
response_model=models.HealthCheck,
status_code=status.HTTP_200_OK,
responses={503: {"model": models.ServiceUnavailable}},
responses={503: {"model": models.GenericError}},
summary="Readiness probe",
)
async def readiness_probe():
Expand Down
87 changes: 61 additions & 26 deletions fastpubsub/api/routers/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Query, status
from fastapi import APIRouter, Depends, Query, status

from fastpubsub import models, services

Expand All @@ -11,21 +12,27 @@
"",
response_model=models.Subscription,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": models.AlreadyExists}},
responses={409: {"model": models.GenericError}},
summary="Create a subscription",
)
async def create_subscription(data: models.CreateSubscription):
async def create_subscription(
data: models.CreateSubscription,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "create"))],
):
return await services.create_subscription(data)


@router.get(
"/{id}",
response_model=models.Subscription,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Get a subscription",
)
async def get_subscription(id: str):
async def get_subscription(
id: str,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "read"))],
):
return await services.get_subscription(id)


Expand All @@ -36,7 +43,9 @@ async def get_subscription(id: str):
summary="List subscriptions",
)
async def list_subscription(
offset: int = Query(default=0, ge=0), limit: int = Query(default=10, ge=1, le=100)
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "read"))],
offset: int = Query(default=0, ge=0),
limit: int = Query(default=10, ge=1, le=100),
):
subscriptions = await services.list_subscription(offset, limit)
return models.ListSubscriptionAPI(data=subscriptions)
Expand All @@ -45,22 +54,30 @@ async def list_subscription(
@router.delete(
"/{id}",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Delete subscription",
)
async def delete_subscription(id: str):
async def delete_subscription(
id: str,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "delete"))],
):
await services.delete_subscription(id)


@router.get(
"/{id}/messages",
response_model=models.ListMessageAPI,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Get messages",
)
async def consume_messages(id: str, consumer_id: str, batch_size: int = Query(default=10, ge=1, le=100)):
subscription = await get_subscription(id)
async def consume_messages(
id: str,
consumer_id: str,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "consume"))],
batch_size: int = Query(default=10, ge=1, le=100),
):
subscription = await get_subscription(id, token)
messages = await services.consume_messages(
subscription_id=subscription.id, consumer_id=consumer_id, batch_size=batch_size
)
Expand All @@ -70,58 +87,76 @@ async def consume_messages(id: str, consumer_id: str, batch_size: int = Query(de
@router.post(
"/{id}/acks",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Ack messages",
)
async def ack_messages(id: str, data: list[UUID]):
subscription = await get_subscription(id)
async def ack_messages(
id: str,
data: list[UUID],
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "consume"))],
):
subscription = await get_subscription(id, token)
await services.ack_messages(subscription_id=subscription.id, message_ids=data)


@router.post(
"/{id}/nacks",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Nack messages",
)
async def nack_messages(id: str, data: list[UUID]):
subscription = await get_subscription(id)
async def nack_messages(
id: str,
data: list[UUID],
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "consume"))],
):
subscription = await get_subscription(id, token)
await services.nack_messages(subscription_id=subscription.id, message_ids=data)


@router.get(
"/{id}/dlq",
response_model=models.ListMessageAPI,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="List dlq messages",
)
async def list_dlq(
id: str, offset: int = Query(default=0, ge=0), limit: int = Query(default=10, ge=1, le=100)
id: str,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "consume"))],
offset: int = Query(default=0, ge=0),
limit: int = Query(default=10, ge=1, le=100),
):
subscription = await get_subscription(id)
subscription = await get_subscription(id, token)
messages = await services.list_dlq_messages(subscription_id=subscription.id, offset=offset, limit=limit)
return models.ListMessageAPI(data=messages)


@router.post(
"/{id}/dlq/reprocess",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Reprocess dlq messages",
)
async def reprocess_dlq(id: str, data: list[UUID]):
subscription = await get_subscription(id)
async def reprocess_dlq(
id: str,
data: list[UUID],
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "consume"))],
):
subscription = await get_subscription(id, token)
await services.reprocess_dlq_messages(subscription_id=subscription.id, message_ids=data)


@router.get(
"/{id}/metrics",
response_model=models.SubscriptionMetrics,
status_code=status.HTTP_200_OK,
responses={404: {"model": models.NotFound}},
responses={404: {"model": models.GenericError}},
summary="Get subscription metrics",
)
async def subscription_metrics(id: str):
subscription = await get_subscription(id)
async def subscription_metrics(
id: str,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("subscriptions", "read"))],
):
subscription = await get_subscription(id, token)
return await services.subscription_metrics(subscription_id=subscription.id)
Loading