@@ -551,47 +551,68 @@ export class SubscriptionService implements OnModuleInit {
551551 async expireAndRechargeCredits ( ) {
552552 this . logger . log ( 'Starting expire and recharge credits job' ) ;
553553
554+ // Add distributed lock to prevent concurrent execution of the entire job
555+ const lockKey = 'expire_and_recharge_credits_job_lock' ;
556+ const releaseLock = await this . redis . acquireLock ( lockKey ) ;
557+
558+ if ( ! releaseLock ) {
559+ this . logger . debug ( 'Failed to acquire lock for expire and recharge credits job, skipping' ) ;
560+ return ; // Another process is handling this job
561+ }
562+
554563 try {
555- await this . prisma . $transaction ( async ( prisma ) => {
556- const now = new Date ( ) ;
564+ const now = new Date ( ) ;
557565
558- // Step 1: Find all non-duplicate credit recharge records that are expired but not disabled
559- const activeRecharges = await prisma . creditRecharge . findMany ( {
560- where : {
561- expiresAt : {
562- lte : now ,
563- } ,
564- enabled : true ,
565- } ,
566- distinct : [ 'rechargeId' ] ,
567- orderBy : {
568- createdAt : 'desc' ,
566+ // Step 1: Find all non-duplicate credit recharge records that are expired but not disabled
567+ const activeRecharges = await this . prisma . creditRecharge . findMany ( {
568+ where : {
569+ expiresAt : {
570+ lte : now ,
569571 } ,
570- } ) ;
572+ enabled : true ,
573+ } ,
574+ distinct : [ 'rechargeId' ] ,
575+ orderBy : {
576+ createdAt : 'desc' ,
577+ } ,
578+ } ) ;
579+
580+ this . logger . log ( `Found ${ activeRecharges . length } active credit recharge records` ) ;
571581
572- this . logger . log ( `Found ${ activeRecharges . length } active credit recharge records` ) ;
582+ // Step 2: Process subscription-based recharges only (gift recharges are now handled by lazy loading)
583+ const subscriptionRecharges = activeRecharges . filter ( ( r ) => r . source === 'subscription' ) ;
573584
574- // Step 2: Disable all active recharge records
575- if ( activeRecharges . length > 0 ) {
576- await prisma . creditRecharge . updateMany ( {
577- where : {
578- rechargeId : {
579- in : activeRecharges . map ( ( r ) => r . rechargeId ) ,
585+ await this . prisma . $transaction ( async ( prisma ) => {
586+ for ( const recharge of subscriptionRecharges ) {
587+ try {
588+ // Re-check if the recharge is still active and expired (double-check within transaction)
589+ const currentRecharge = await prisma . creditRecharge . findFirst ( {
590+ where : {
591+ rechargeId : recharge . rechargeId ,
592+ enabled : true ,
593+ expiresAt : {
594+ lte : now ,
595+ } ,
580596 } ,
581- } ,
582- data : {
583- enabled : false ,
584- } ,
585- } ) ;
597+ } ) ;
586598
587- this . logger . log ( `Disabled ${ activeRecharges . length } credit recharge records` ) ;
588- }
599+ if ( ! currentRecharge ) {
600+ this . logger . debug (
601+ `Recharge ${ recharge . rechargeId } is no longer active or expired, skipping` ,
602+ ) ;
603+ continue ; // Already processed by another process
604+ }
605+
606+ // Disable the expired recharge
607+ await prisma . creditRecharge . update ( {
608+ where : { rechargeId : recharge . rechargeId } ,
609+ data : { enabled : false } ,
610+ } ) ;
589611
590- // Step 3: Process subscription-based recharges only (gift recharges are now handled by lazy loading)
591- const subscriptionRecharges = activeRecharges . filter ( ( r ) => r . source === 'subscription' ) ;
612+ this . logger . log (
613+ `Disabled expired credit recharge ${ recharge . rechargeId } for user ${ recharge . uid } ` ,
614+ ) ;
592615
593- for ( const recharge of subscriptionRecharges ) {
594- try {
595616 // Check if user has active subscription
596617 const subscription = await prisma . subscription . findFirst ( {
597618 where : {
@@ -610,6 +631,28 @@ export class SubscriptionService implements OnModuleInit {
610631 continue ;
611632 }
612633
634+ // Check if there's already a new monthly credit recharge for this user
635+ const newExpiresAt = new Date ( ) ;
636+ newExpiresAt . setMonth ( newExpiresAt . getMonth ( ) + 1 ) ;
637+
638+ const existingMonthlyRecharge = await prisma . creditRecharge . findFirst ( {
639+ where : {
640+ uid : recharge . uid ,
641+ source : 'subscription' ,
642+ enabled : true ,
643+ expiresAt : {
644+ gte : newExpiresAt ,
645+ } ,
646+ } ,
647+ } ) ;
648+
649+ if ( existingMonthlyRecharge ) {
650+ this . logger . debug (
651+ `User ${ recharge . uid } already has active monthly credit recharge, skipping` ,
652+ ) ;
653+ continue ; // Already has new monthly credits
654+ }
655+
613656 // Find plan quota for credit amount
614657 let plan : PlanQuota | null = null ;
615658 if ( subscription . overridePlan ) {
@@ -653,9 +696,6 @@ export class SubscriptionService implements OnModuleInit {
653696
654697 // Handle subscription source - monthly recharge with creditQuota
655698 if ( recharge . source === 'subscription' && plan . creditQuota > 0 ) {
656- const newExpiresAt = new Date ( ) ;
657- newExpiresAt . setMonth ( newExpiresAt . getMonth ( ) + 1 ) ;
658-
659699 await this . createCreditRecharge (
660700 prisma ,
661701 recharge . uid ,
@@ -680,6 +720,13 @@ export class SubscriptionService implements OnModuleInit {
680720 } catch ( error ) {
681721 this . logger . error ( 'Error in expire and recharge credits job:' , error ) ;
682722 throw error ;
723+ } finally {
724+ // Always release the lock
725+ try {
726+ await releaseLock ( ) ;
727+ } catch ( lockError ) {
728+ this . logger . warn ( `Error releasing job lock: ${ lockError . message } ` ) ;
729+ }
683730 }
684731 }
685732
0 commit comments