Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d7aecb0
First version of working websockets
marcosfrenkel Sep 2, 2025
2525062
Implements basic state handling, as well as client disconnects detection
marcosfrenkel Sep 4, 2025
9c53aaf
cleans up code and deals with ruff and mypy
marcosfrenkel Sep 4, 2025
63c668a
Moved all coordination state into its own model and dep
marcosfrenkel Sep 11, 2025
a8ba0b4
Implemented feedback to coordination.py
marcosfrenkel Sep 17, 2025
df1d790
Removed CoordinationState object
marcosfrenkel Sep 17, 2025
dc47b5d
State is now used through a dep
marcosfrenkel Sep 17, 2025
05db5de
mypy and ruff fixes
marcosfrenkel Sep 17, 2025
405716e
Minor bugfix
marcosfrenkel Sep 24, 2025
e8f78aa
ruff mypy checks
marcosfrenkel Oct 15, 2025
5a664ad
Checking for websocket connection before sending message
marcosfrenkel Oct 17, 2025
ee81de7
Adds virtual rotary encoder
marcosfrenkel Oct 18, 2025
5ae12f6
Refactor CollectFollowerResponse to use 'accepted' instead of 'succes…
marcosfrenkel Oct 19, 2025
b992fc8
Question order is synchronized
marcosfrenkel Oct 27, 2025
ba5d6cd
Basis submitted for leader and follower
marcosfrenkel Oct 27, 2025
85bf6e6
QKD now runs from the GUI
marcosfrenkel Oct 28, 2025
029bb24
adds qkd response to gui
marcosfrenkel Oct 30, 2025
9f4de2e
Ruff and mypy checks
marcosfrenkel Oct 30, 2025
842082c
reduce maximum question index to 8 and remove node address from conne…
marcosfrenkel Nov 3, 2025
7d922f4
refactor CHSH calculation and add expectation signs to settings
marcosfrenkel Nov 4, 2025
b6b8e47
Adding logging calls
marcosfrenkel Nov 4, 2025
56aa7de
Adds modulo for basis.
marcosfrenkel Nov 4, 2025
770ec5e
Added resetting to missing qkd state variables
marcosfrenkel Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions configs/config_app_example.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# MAKE SURE TO RENAME THIS FILE TO config.toml AND PLACE IT IN THE ROOT OF THE PROJECT

node_name = "example_node"

# Router configuration
router_name = "router1"
router_address = "xx.xx.xx.xx" # Replace with actual IP address
Expand Down
6 changes: 5 additions & 1 deletion src/pqnstack/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from pqnstack.app.core.config import settings
from pqnstack.network.client import Client

from pqnstack.app.core.config import NodeState
from pqnstack.app.core.config import get_state

async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]:
async with httpx.AsyncClient(timeout=60) as client:
Expand All @@ -22,3 +23,6 @@ async def get_instrument_client() -> AsyncGenerator[Client, None]:


InstrumentClientDep = Annotated[httpx.AsyncClient, Depends(get_instrument_client)]


StateDep = Annotated[NodeState, Depends(get_state)]
4 changes: 4 additions & 0 deletions src/pqnstack/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from fastapi import APIRouter

from pqnstack.app.api.routes import chsh
from pqnstack.app.api.routes import coordination
from pqnstack.app.api.routes import debug
from pqnstack.app.api.routes import qkd
from pqnstack.app.api.routes import rng
from pqnstack.app.api.routes import serial
Expand All @@ -12,3 +14,5 @@
api_router.include_router(timetagger.router)
api_router.include_router(rng.router)
api_router.include_router(serial.router)
api_router.include_router(coordination.router)
api_router.include_router(debug.router)
21 changes: 8 additions & 13 deletions src/pqnstack/app/api/routes/chsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from fastapi import status

from pqnstack.app.api.deps import ClientDep
from pqnstack.app.api.deps import StateDep
from pqnstack.app.core.config import settings
from pqnstack.app.core.config import state
from pqnstack.app.core.models import calculate_chsh_expectation_error
from pqnstack.network.client import Client

Expand Down Expand Up @@ -44,6 +44,7 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment.

expectation_values = []
expectation_errors = []
basis = [0, abs(basis[0] - basis[1]) % 90]
for angle in basis: # Going through my basis angles
for i in range(2): # Going through follower basis angles
counts = []
Expand Down Expand Up @@ -94,18 +95,12 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment.
logger.info("Expectation values: %s", expectation_values)
logger.info("Expectation errors: %s", expectation_errors)

negative_count = sum(1 for v in expectation_values if v < 0)
negative_indices = [i for i, v in enumerate(expectation_values) if v < 0]
impossible_counts = [0, 2, 4]
# FIXME: This is a temporary fix for handling impossible expectation values. We should not have to rely on the settings for this.
expectation_values = [x*y for x,y in zip(expectation_values, settings.chsh_settings.expectation_signs)]
logger.info("What are you settings? %s", settings.chsh_settings.expectation_signs)

if negative_count in impossible_counts:
msg = f"Impossible negative expectation values found: {negative_indices}, expectation_values = {expectation_values}, expectation_errors = {expectation_errors}"
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)

if len(negative_indices) > 1 or negative_indices[0] != 0:
logger.warning("Expectation values have unexpected negative indices: %s", negative_indices)

chsh_value = sum(abs(x) for x in expectation_values)
logger.info("After passing signed calculation: %s", expectation_values)
chsh_value = sum(x for x in expectation_values)
chsh_error = sum(x**2 for x in expectation_errors) ** 0.5

return chsh_value, chsh_error
Expand All @@ -129,7 +124,7 @@ async def chsh(


@router.post("/request-angle-by-basis")
async def request_angle_by_basis(index: int, *, perp: bool = False) -> bool:
async def request_angle_by_basis(index: int, state: StateDep, *, perp: bool = False) -> bool:
client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000)
hwp = cast(
"RotatorInstrument",
Expand Down
199 changes: 199 additions & 0 deletions src/pqnstack/app/api/routes/coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import asyncio
import logging

from fastapi import APIRouter
from fastapi import HTTPException
from fastapi import Request
from fastapi import WebSocket
from fastapi import WebSocketDisconnect
from fastapi import status
from pydantic import BaseModel

from pqnstack.app.api.deps import ClientDep
from pqnstack.app.api.deps import StateDep
from pqnstack.app.core.config import ask_user_for_follow_event
from pqnstack.app.core.config import settings
from pqnstack.app.core.config import user_replied_event

logger = logging.getLogger(__name__)


class FollowRequestResponse(BaseModel):
accepted: bool


class CollectFollowerResponse(BaseModel):
accepted: bool


class ResetCoordinationStateResponse(BaseModel):
message: str = "Coordination state reset successfully"


router = APIRouter(prefix="/coordination", tags=["coordination"])


# TODO: Send a disconnection message if I was following/leading someone.
# FIXME: This is techincally resetting more than just coordination state. including qkd.
@router.post("/reset_coordination_state")
async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse:
"""Reset the coordination state of the node."""
state.leading = False
state.followers_address = ""
state.following = False
state.following_requested = False
state.following_requested_user_response = None
state.leaders_address = ""
state.leaders_name = ""
state.qkd_emoji_pick = ""
state.qkd_bit_list = []
state.qkd_question_order = []
state.qkd_leader_basis_list = []
state.qkd_follower_basis_list = []
state.qkd_single_bit_current_index = 0
state.qkd_resulting_bit_list = []
state.qkd_request_basis_list = []
state.qkd_request_bit_list = []
state.qkd_n_matching_bits = -1
return ResetCoordinationStateResponse()


@router.post("/collect_follower")
async def collect_follower(
request: Request, address: str, state: StateDep, http_client: ClientDep
) -> CollectFollowerResponse:
"""
Endpoint called by a leader node (this one) to request a follower node (other node) to follow it.

Returns
-------
CollectFollowerResponse indicating if the follower accepted the request.
"""
logger.info("Requesting client at %s to follow", address)

# Get the port this server is listening on
server_port = request.scope["server"][1]

ret = await http_client.post(
f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}&leaders_port={server_port}"
)
if ret.status_code != status.HTTP_200_OK:
raise HTTPException(status_code=ret.status_code, detail=ret.text)

response_data = ret.json()
if response_data.get("accepted") is True:
state.leading = True
state.followers_address = address
logger.info("Successfully collected follower")
return CollectFollowerResponse(accepted=True)
if response_data.get("accepted") is False:
logger.info("Follower rejected follow request")
return CollectFollowerResponse(accepted=False)

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons"
)


@router.post("/follow_requested")
async def follow_requested(
request: Request, leaders_name: str, leaders_port: int, state: StateDep
) -> FollowRequestResponse:
"""
Endpoint is called by a leader node (other node) to request this node to follow it.

Returns
-------
FollowRequestResponse indicating if the follow request is accepted.
"""
logger.debug("Requesting client at %s to follow", leaders_name)

if request.client is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request lacks the clients host")
leaders_address = f"{request.client.host}:{leaders_port}"

# Check if the client is ready to accept a follower request and that node is not already following someone.
if not state.client_listening_for_follower_requests or state.following:
logger.info(
"Request rejected because %s",
(
"client is not listening for requests"
if not state.client_listening_for_follower_requests
else "this node is already following someone"
),
)
return FollowRequestResponse(accepted=False)

state.following_requested = True
state.leaders_name = leaders_name
state.leaders_address = leaders_address
# Trigger the state change to get the websocket to send question to user
ask_user_for_follow_event.set()

logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address)
await user_replied_event.wait() # Wait for a state change event to see if user accepted
user_replied_event.clear() # Reset the event for the next change
if state.following_requested_user_response:
logger.debug("Follow request from %s accepted.", leaders_address)
state.following = True
state.leaders_name = leaders_name
state.leaders_address = leaders_address
return FollowRequestResponse(accepted=True)

logger.debug("Follow request from %s rejected.", leaders_address)
# Clean up the state if user rejected
state.leaders_address = ""
state.leaders_name = ""
state.following_requested = False
state.following_requested_user_response = None
return FollowRequestResponse(accepted=False)


@router.websocket("/follow_requested_alerts")
async def follow_requested_alert(websocket: WebSocket, state: StateDep) -> None:
"""Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client."""
await websocket.accept()
logger.info("Client connected to websocket for multiplayer coordination.")
state.client_listening_for_follower_requests = True

async def ask_user_for_follow_handler() -> None:
"""Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected."""
while True:
try:
await ask_user_for_follow_event.wait() # Wait for a state change event
if state.following_requested:
logger.debug("Websocket detected a follow request, asking user for response.")
if websocket.client_state.name == "CONNECTED":
await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name}?")
else:
logger.debug("WebSocket not connected, cannot send message")
break
ask_user_for_follow_event.clear() # Reset the event for the next change
except WebSocketDisconnect:
logger.info("WebSocket disconnected in ask_user_for_follow_handler")
break
except Exception:
logger.exception("Error in ask_user_for_follow_handler, continuing to listen")
ask_user_for_follow_event.clear() # Reset the event to continue

async def client_message_handler() -> None:
"""Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects."""
try:
while True:
response = await websocket.receive_text()
state.following_requested_user_response = response.lower() in ["true", "yes", "y"]
state.following_requested = False
logger.debug("Websocket received a response from user: %s", state.following_requested_user_response)
user_replied_event.set()
except WebSocketDisconnect:
logger.info("Client disconnected from websocket for multiplayer coordination.")
state.client_listening_for_follower_requests = False

state_change_task = asyncio.create_task(ask_user_for_follow_handler())
client_message_task = asyncio.create_task(client_message_handler())

try:
await asyncio.gather(state_change_task, client_message_task)
finally:
state_change_task.cancel()
client_message_task.cancel()
18 changes: 18 additions & 0 deletions src/pqnstack/app/api/routes/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastapi import APIRouter

from pqnstack.app.api.deps import StateDep
from pqnstack.app.core.config import NodeState
from pqnstack.app.core.config import Settings
from pqnstack.app.core.config import settings

router = APIRouter(prefix="/debug", tags=["debug"])


@router.get("/state")
async def get_state(state: StateDep) -> NodeState:
return state


@router.get("/settings")
async def get_settings() -> Settings:
return settings
Loading
Loading