Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,49 @@ build
coverage
.next
.env
.DS_Store

# Local .terraform directories
.terraform/

# .tfstate files
*.tfstate
*.tfstate.*

# Crash log files
crash.log
crash.*.log

# Exclude all .tfvars files, which are likely to contain sensitive data, such as
# password, private keys, and other secrets. These should not be part of version
# control as they are data points which are potentially sensitive and subject
# to change depending on the environment.
*.tfvars
*.tfvars.json

# Ignore override files as they are usually used to override resources locally and so
# are not checked in
override.tf
override.tf.json
*_override.tf
*_override.tf.json

# Ignore transient lock info files created by terraform apply
.terraform.tfstate.lock.info

# Include override files you do wish to add to version control using negated pattern
# !example_override.tf

# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan
# example: *tfplan*

# Ignore CLI configuration files
.terraformrc
terraform.rc

# Optional: ignore graph output files generated by `terraform graph`
# *.dot

# Optional: ignore plan files saved before destroying Terraform configuration
# Uncomment the line below if you want to ignore planout files.
# planout
2 changes: 1 addition & 1 deletion backend/collaboration-service/Dockerfile.collaboration
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN chown -R collaboration-service:nodejs /app
USER collaboration-service

# Expose ports for HTTP and WebSocket servers
EXPOSE 8004 8005
EXPOSE 8004

# Start the application
CMD ["pnpm", "start"]
1 change: 1 addition & 0 deletions backend/collaboration-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"license": "ISC",
"packageManager": "[email protected]",
"dependencies": {
"@google-cloud/pubsub": "^5.2.0",
"@y/websocket-server": "^0.1.1",
"axios": "^1.7.9",
"amqplib": "^0.10.9",
Expand Down
3 changes: 1 addition & 2 deletions backend/collaboration-service/src/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export const config = {
// Server Ports
WS_PORT: process.env.WS_PORT || 8005,
HTTP_PORT: process.env.HTTP_PORT || 8004,
PORT: process.env.PORT || 8004,
// Database
MONGO_URI:
process.env.MONGODB_URI ||
Expand Down
8 changes: 8 additions & 0 deletions backend/collaboration-service/src/config/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ export const ROOM_CREATED_TOPIC = 'room_created_topic';
export class KafkaManager {
constructor() {
console.log('Collaboration Service KafkaManager constructor');
const isGCP = !!process.env.PUBSUB_PROJECT_ID;
if (isGCP) {
console.log('GCP environment detected, Kafka not configured for GCP');
this.kafka = null;
this.producer = null;
this.consumer = null;
return;
}
const host = process.env.KAFKA_HOST || 'localhost';
const port = process.env.KAFKA_PORT || '29092';
const brokers = (process.env.KAFKA_BROKERS || `${host}:${port}`).split(',');
Expand Down
208 changes: 208 additions & 0 deletions backend/collaboration-service/src/config/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { PubSub } from "@google-cloud/pubsub";

// Room creation topics
export const ROOM_CREATION_TOPIC = "room_creation_topic";
export const ROOM_CREATED_TOPIC = "room_created_topic";
export const ROOM_CREATION_SUBSCRIPTION = "room_creation_sub";

// Code execution topics
export const JOB_EXECUTION_TOPIC = "job_execution_topic";

export class PubSubManager {
constructor() {
console.log("Collaboration Service PubSubManager constructor");

// Check if running on GCP (PUBSUB_PROJECT_ID is set)
this.useGcp = !!process.env.PUBSUB_PROJECT_ID;

if (this.useGcp) {
console.log("Using GCP Pub/Sub");
this.pubsub = new PubSub({
projectId: process.env.PUBSUB_PROJECT_ID,
});
} else {
console.log("GCP Pub/Sub not configured, using fallback mode");
this.pubsub = null;
}

this.subscriptions = new Map();
this.isConnected = false;
}

async initWithRetry(maxRetries = 5, retryDelayMs = 2000) {
if (this.isConnected) {
console.log("Already connected to Pub/Sub");
return;
}

if (!this.useGcp) {
console.log("Skipping Pub/Sub initialization (not on GCP)");
this.isConnected = true;
return;
}

let attempt = 0;
while (attempt < maxRetries) {
try {
// Verify connection by listing topics
const [topics] = await this.pubsub.getTopics();
console.log(`Connected to Pub/Sub. Found ${topics.length} topics`);

// Ensure topics exist
await this.ensureTopicsExist([
ROOM_CREATION_TOPIC,
ROOM_CREATED_TOPIC,
JOB_EXECUTION_TOPIC,
]);

this.isConnected = true;
console.log("Connected to Pub/Sub");
return;
} catch (err) {
attempt += 1;
console.error(`Failed to connect to Pub/Sub (attempt ${attempt} of ${maxRetries}):`, err);
if (attempt >= maxRetries) {
throw err;
}
await new Promise((resolve) => setTimeout(resolve, retryDelayMs * attempt));
}
}
}

async ensureTopicsExist(topicNames) {
for (const topicName of topicNames) {
try {
const topic = this.pubsub.topic(topicName);
const [exists] = await topic.exists();

if (!exists) {
await topic.create();
console.log(`Created topic: ${topicName}`);
}
} catch (error) {
console.error(`Error ensuring topic ${topicName}:`, error);
}
}
}

async setupConsumer(handler) {
await this.initWithRetry();

if (!this.useGcp) {
console.log("Skipping Pub/Sub consumer setup (not on GCP)");
return;
}

await this.setupSubscription(ROOM_CREATION_TOPIC, ROOM_CREATION_SUBSCRIPTION, handler);
}

async setupSubscription(topicName, subscriptionName, handler) {
try {
const topic = this.pubsub.topic(topicName);
let subscription = topic.subscription(subscriptionName);

const [exists] = await subscription.exists();
if (!exists) {
[subscription] = await topic.createSubscription(subscriptionName);
console.log(`Created subscription: ${subscriptionName}`);
}

// Handle messages
const messageHandler = async (message) => {
try {
const value = message.data.toString();
const key = message.attributes.key || null;

await handler({
key,
value,
});

message.ack();
} catch (error) {
console.error(`Error handling message from ${topicName}:`, error);
message.nack();
}
};

subscription.on("message", messageHandler);
subscription.on("error", (error) => {
console.error(`Subscription ${subscriptionName} error:`, error);
});

this.subscriptions.set(subscriptionName, subscription);
console.log(`Subscribed to ${topicName} (${subscriptionName})`);
} catch (error) {
console.error(`Error setting up subscription for ${topicName}:`, error);
throw error;
}
}

async publishRoomCreated(matchId, roomId) {
if (!this.useGcp) {
console.log(`[Fallback] Would publish room created: ${matchId} -> ${roomId}`);
return;
}

try {
const topic = this.pubsub.topic(ROOM_CREATED_TOPIC);
const data = { roomId };
const dataBuffer = Buffer.from(JSON.stringify(data));

await topic.publishMessage({
data: dataBuffer,
attributes: { key: matchId },
});

console.log("Published room created event:", { matchId, roomId });
} catch (error) {
console.error("Error publishing room created event:", error);
throw error;
}
}

async publishJob(job) {
if (!this.useGcp) {
console.log(`[Fallback] Would publish job: ${job.room_id}`);
return;
}

try {
const topic = this.pubsub.topic(JOB_EXECUTION_TOPIC);
const dataBuffer = Buffer.from(JSON.stringify(job));

await topic.publishMessage({
data: dataBuffer,
attributes: { room_id: job.room_id },
});

console.log(">>> Sent job: ", job.room_id);
} catch (error) {
console.error("Error publishing job:", error);
throw error;
}
}

async disconnect() {
if (!this.useGcp) {
return;
}

try {
for (const [name, subscription] of this.subscriptions) {
await subscription.close();
console.log(`Closed subscription: ${name}`);
}
this.subscriptions.clear();

if (this.pubsub) {
await this.pubsub.close();
}
console.log("Disconnected from Pub/Sub");
} catch (error) {
console.error("Error disconnecting from Pub/Sub:", error);
}
}
}

export const pubsubManager = new PubSubManager();
80 changes: 80 additions & 0 deletions backend/collaboration-service/src/config/rabbitmq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import amqp from "amqplib";

export const JOB_EXECUTION_QUEUE = "job_execution_queue";

export class RabbitMQManager {
constructor() {
console.log("Collaboration Service RabbitMQManager constructor");
const isGCP = !!process.env.PUBSUB_PROJECT_ID;
if (isGCP) {
console.log("GCP environment detected, RabbitMQ not configured for GCP");
this.connection = null;
this.channel = null;
return;
}
this.rabbitMQUrl = process.env.RABBITMQ_URL || "amqp://user:password@rabbitmq";
this.connection = null;
this.channel = null;
this.isConnected = false;
}

async initWithRetry(maxRetries = 5, retryDelayMs = 2000) {
if (this.isConnected) {
console.log("Already connected to RabbitMQ");
return;
}

let attempt = 0;
while (attempt < maxRetries) {
try {
this.connection = await amqp.connect(this.rabbitMQUrl);
this.channel = await this.connection.createChannel();

await this.channel.assertQueue(JOB_EXECUTION_QUEUE, { durable: true });

console.log("Connected to RabbitMQ");
this.isConnected = true;
return;
} catch (err) {
attempt += 1;
console.error(`Failed to connect to RabbitMQ (attempt ${attempt} of ${maxRetries}):`, err);
if (attempt >= maxRetries) {
throw err;
}
await new Promise((resolve) => setTimeout(resolve, retryDelayMs * attempt));
}
}
}

async sendJob(job) {
if (!this.isConnected || !this.channel) {
throw new Error("RabbitMQ not connected");
}

try {
this.channel.sendToQueue(JOB_EXECUTION_QUEUE, Buffer.from(JSON.stringify(job)), {
persistent: true,
});
console.log(">>> Sent job: ", job.room_id);
} catch (error) {
console.error("Error while sending job: ", error.message);
throw error;
}
}

async disconnect() {
try {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
console.log("Disconnected from RabbitMQ");
} catch (error) {
console.error("Error disconnecting from RabbitMQ:", error);
}
}
}

export const rabbitmqManager = new RabbitMQManager();
Loading
Loading