11import type * as Party from 'partykit/server' ;
22import { onConnect as y_onConnect } from 'y-partykit' ;
3-
43import { Buffer } from 'node:buffer' ;
54
65import * as Y from 'yjs' ;
@@ -9,55 +8,43 @@ import {verifyToken} from '../utils/jwt.js';
98import { getDocument , upsertDocument , checkRoomExists , checkUserVerified } from '../storage/db.js' ;
109
1110const CHAT_HISTORY_LIMIT = 500 ;
12- const CHAT_COLLECTION_KEY = 'chatMessages' ;
1311const EXECUTION_STATE_KEY = 'executionState' ;
1412
1513function ensureSharedStructures ( doc : Y . Doc ) {
1614 doc . getText ( 'codemirror' ) ;
1715 doc . getMap < string > ( 'config' ) ;
18- doc . getArray ( CHAT_COLLECTION_KEY ) ;
16+ doc . getArray ( 'chat' ) ;
1917 doc . getMap ( EXECUTION_STATE_KEY ) ;
2018}
2119
2220function pruneChatHistory ( doc : Y . Doc ) {
23- const chatArray = doc . getArray ( CHAT_COLLECTION_KEY ) ;
21+ const chatArray = doc . getArray ( 'chat' ) ;
2422 if ( chatArray . length <= CHAT_HISTORY_LIMIT ) {
2523 return ;
2624 }
2725 const excess = chatArray . length - CHAT_HISTORY_LIMIT ;
2826 chatArray . delete ( 0 , excess ) ;
2927}
30- // import {roomRouter} from '../api/roomRoutes.js';
3128
3229export default class YjsServer implements Party . Server {
3330 constructor ( public room : Party . Room ) { }
3431
3532 static async onBeforeConnect ( request : Party . Request , _lobby : Party . Lobby ) {
3633 try {
3734 const cookieHeader = request . headers . get ( 'cookie' ) ;
38- if ( ! cookieHeader ) {
39- console . error ( 'No cookie header found' ) ;
40- return new Response ( 'Unauthorized: No cookies' , { status : 401 } ) ;
41- }
4235
43- const cookies = parseCookies ( cookieHeader ) ;
36+ const cookies = cookieHeader ? parseCookies ( cookieHeader ) : { } ;
4437 const accessToken = cookies [ 'accessToken' ] ;
45-
4638 if ( ! accessToken ) {
4739 console . error ( 'No accessToken cookie found' ) ;
48- return new Response ( 'Unauthorized : No access token' , { status : 401 } ) ;
40+ return new Response ( 'Bad Request : No access token' , { status : 400 } ) ;
4941 }
50- const { valid, payload} = await verifyToken ( accessToken ) ;
5142
52- if ( ! valid || ! payload ) {
53- console . error ( 'Token verification failed:' , payload ) ;
54- return new Response ( 'Unauthorized: Invalid token or payload' , { status : 401 } ) ;
55- }
5643 const roomId = new URL ( request . url ) . pathname . split ( '/' ) . pop ( ) ;
5744
5845 if ( ! roomId ) {
59- console . error ( 'Room Id is undefined: ' ) ;
60- return new Response ( 'Room Id is undefined ' , { status : 400 } ) ;
46+ console . error ( 'Room ID is undefined' ) ;
47+ return new Response ( 'Bad Request: Room ID missing ' , { status : 400 } ) ;
6148 }
6249 const roomExists = await checkRoomExists ( roomId ) ;
6350
@@ -66,97 +53,97 @@ export default class YjsServer implements Party.Server {
6653 return new Response ( 'Room not found' , { status : 404 } ) ;
6754 }
6855
69- // const isUserVerified = await checkUserVerified(payload.userId.toString(), roomId);
70-
71- // if (!isUserVerified) {
72- // console.error(`User not authorised to enter this room`);
73- // return new Response('Unauthorised : User not authorised to enter this room', {status: 401});
74- // }
75-
76- request . headers . set ( 'X-User-ID' , payload . userId . toString ( ) ) ;
77-
78- if ( payload . sessionId ) {
79- try {
80- request . headers . set ( 'X-Session-ID' , payload . sessionId . toString ( ) ) ;
81- } catch ( sessionIdError ) {
82- console . warn (
83- 'Session ID missing or malformed on token payload' ,
84- sessionIdError ,
85- payload . sessionId
86- ) ;
87- }
88- } else {
89- console . info ( 'Session ID not present on token payload; continuing without it' ) ;
90- }
56+ request . headers . set ( 'X-Access-Token' , accessToken ) ;
9157
9258 return request ;
9359 } catch ( e ) {
9460 console . error ( 'Authentication error:' , e ) ;
95- return new Response ( 'Unauthorized ' , { status : 401 } ) ;
61+ return new Response ( 'Internal Server Error ' , { status : 500 } ) ;
9662 }
9763 }
9864
99- async onConnect ( connection : Party . Connection ) {
100- const room = this . room ;
101- await y_onConnect ( connection , this . room , {
102- async load ( ) {
103- // This is called once per "room" when the first user connects
65+ async onConnect ( connection : Party . Connection , ctx : Party . ConnectionContext ) {
66+ try {
67+ const accessToken = ctx . request . headers . get ( 'X-Access-Token' ) ;
68+ const roomId = this . room . id ;
69+ if ( ! accessToken || ! roomId ) {
70+ console . error ( 'Missing auth data in onConnect' ) ;
71+ connection . close ( 4000 , 'Internal error: Missing authentication data' ) ;
72+ return ;
73+ }
74+ const { valid, payload} = await verifyToken ( accessToken ) ;
10475
105- // Creates the backend Yjs document
106- const doc = new Y . Doc ( ) ;
76+ if ( ! valid || ! payload ) {
77+ console . error ( 'Token verification failed:' , payload ) ;
78+ connection . close ( 4001 , 'Unauthorised: Invalid or expired token' ) ;
79+ return ;
80+ }
10781
108- // Load the document from the database
109- try {
110- const { data, error} = await getDocument ( room . id ) ;
111- if ( error ) {
112- throw new Error ( error . message ) ;
113- }
82+ const isUserVerified = await checkUserVerified ( payload . userId . toString ( ) , roomId ) ;
11483
115- if ( data ) {
116- // If the document exists on the database,
117- // apply it to the Yjs document
118- try {
119- const buffer = Buffer . from ( data . document , 'base64' ) ;
120- Y . applyUpdate ( doc , new Uint8Array ( buffer ) ) ;
121- ensureSharedStructures ( doc ) ;
122- pruneChatHistory ( doc ) ;
123- } catch ( parseErr ) {
124- console . warn ( `[${ room . id } ] Data corrupted, creating new document` ) ;
125- }
126- } else {
127- console . log ( `[${ room . id } ] No existing document found, creating new document` ) ;
128- ensureSharedStructures ( doc ) ;
129- }
84+ if ( ! isUserVerified ) {
85+ console . error ( `User ${ payload . userId } not authorised to enter room ${ roomId } ` ) ;
86+ connection . close ( 4003 , 'Forbidden: You are not authorised for this room' ) ;
87+ return ;
88+ }
13089
131- // Return the Yjs document to y-partykit to manage
132- ensureSharedStructures ( doc ) ;
133- pruneChatHistory ( doc ) ;
134- return doc ;
135- } catch ( err ) {
136- console . error ( `[${ room . id } ] Load failed:` , err ) ;
137- throw err ;
138- }
139- } ,
140- callback : {
141- handler : async doc => {
142- // This is called every few seconds if the document has changed
143-
144- // convert the Yjs document to a Uint8Array
145- try {
146- pruneChatHistory ( doc ) ;
147- const content = Y . encodeStateAsUpdate ( doc ) ;
90+ await y_onConnect ( connection , this . room , {
91+ async load ( ) {
92+ // This is called once per "room" when the first user connects
14893
149- // Save the document to the database
150- const { data : _data , error} = await upsertDocument ( room . id , content ) ;
94+ // Creates the backend Yjs document
95+ const doc = new Y . Doc ( ) ;
96+
97+ // Load the document from the database
98+ try {
99+ const { data, error} = await getDocument ( roomId ) ;
151100 if ( error ) {
152- console . error ( `[${ room . id } ] Failed to save:` , error ) ;
153- throw new Error ( `Failed to save into database: ${ error . message } ` ) ;
101+ throw new Error ( error . message ) ;
102+ }
103+
104+ if ( data ) {
105+ try {
106+ const buffer = Buffer . from ( data . document , 'base64' ) ;
107+ Y . applyUpdate ( doc , new Uint8Array ( buffer ) ) ;
108+ ensureSharedStructures ( doc ) ;
109+ pruneChatHistory ( doc ) ;
110+ } catch ( parseErr ) {
111+ console . warn ( `[${ roomId } ] Data corrupted, creating new document` ) ;
112+ }
113+ } else {
114+ console . log ( `[${ roomId } ] No existing document found, creating new document` ) ;
115+ ensureSharedStructures ( doc ) ;
154116 }
117+
118+ // Return the Yjs document to y-partykit to manage
119+ ensureSharedStructures ( doc ) ;
120+ pruneChatHistory ( doc ) ;
121+ return doc ;
155122 } catch ( err ) {
156- console . error ( `[${ room . id } ] Save error: ` , err ) ;
123+ console . error ( `[${ roomId } ] Load failed:` , err ) ;
124+ throw err ;
157125 }
158126 } ,
159- } ,
160- } ) ;
127+ callback : {
128+ handler : async doc => {
129+ try {
130+ pruneChatHistory ( doc ) ;
131+ const content = Y . encodeStateAsUpdate ( doc ) ;
132+
133+ const { data : _data , error} = await upsertDocument ( roomId , content ) ;
134+ if ( error ) {
135+ console . error ( `[${ roomId } ] Failed to save:` , error ) ;
136+ throw new Error ( `Failed to save into database: ${ error . message } ` ) ;
137+ }
138+ } catch ( err ) {
139+ console . error ( `[${ roomId } ] Save error: ` , err ) ;
140+ }
141+ } ,
142+ } ,
143+ } ) ;
144+ } catch ( e ) {
145+ console . error ( 'Connection error:' , e ) ;
146+ connection . close ( 4000 , 'Internal server error' ) ;
147+ }
161148 }
162149}
0 commit comments