-
-
Notifications
You must be signed in to change notification settings - Fork 305
feat: implement project locks containing multiple job ids #3271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Replace old format Map<Long, Long> with new format Map<Long, Set<Long>> to support configurable concurrent jobs per project. One-time Redis Migration automatically runs on startup and performs one-time data conversion. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
WalkthroughProject batch locking changed from a single-job lock per project to a set of job IDs with configurable per-project concurrency. Public API models, locking manager, startup migration, and tests were updated to use Set for locks, fetch multiple JobInfo entries, and migrate legacy Redis entries. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant Controller as ProjectBatchLockController
participant Manager as BatchJobProjectLockingManager
participant Store as Redis/Local Map
rect rgba(230,245,255,0.6)
note over Manager: Startup migration (afterPropertiesSet)
Manager->>Store: Read legacy Long? entries
alt legacy format present
Manager->>Store: Write converted Set<Long> entries
else
note over Manager: Already set-based
end
end
Client->>Controller: GET /locks
Controller->>Manager: getMap()/getLockedForProject()
Manager->>Store: Read Set<Long> per project
Store-->>Manager: Set<Long>
Manager-->>Controller: Set<Long> per project
Controller->>Manager: fetch JobInfo for each jobId
Manager-->>Controller: List<JobInfo> (filtered)
Controller-->>Client: ProjectLockModel{lockedJobIds, jobInfos, status}
rect rgba(235,255,235,0.6)
note over Client,Manager: Lock attempt
Client->>Manager: lock(projectId, jobId)
Manager->>Store: Atomic compute: read Set<Long>
alt size < projectConcurrency
Store-->>Manager: Updated Set + jobId
Manager-->>Client: Locked
else
Manager-->>Client: Rejected (at capacity)
end
end
rect rgba(255,240,240,0.6)
note over Client,Manager: Unlock
Client->>Manager: unlock(projectId, jobId)
Manager->>Store: Atomic compute: remove jobId from Set<Long>
Store-->>Manager: Updated Set<Long>
Manager-->>Client: Unlocked/No-op
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt (2)
307-309
: Fix assertJobUnlocked to set semantics.Still asserts 0L; should assert empty set.
fun assertJobUnlocked() { - batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.isEqualTo(0L) + batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.isEmpty() }
385-387
: Fix verifyJobLocked to set semantics.Assert that the set contains the job ID (not equals).
fun verifyJobLocked(job: BatchJob) { - batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.isEqualTo(job.id) + batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.contains(job.id) }
🧹 Nitpick comments (8)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt (1)
172-181
: Update log message to reflect per-project concurrency.Message still says “Other job ... running”. With n>1, prefer “Project concurrency limit reached; re-queuing”.
- logger.debug( - "⚠️ Cannot run execution ${executionItem.chunkExecutionId}. " + - "Other job from the project is currently running, skipping", - ) + logger.debug( + "⚠️ Cannot run execution ${executionItem.chunkExecutionId}. " + + "Project concurrency limit reached; re-queuing", + )backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt (1)
15-16
: Validate projectConcurrency ≥ 1.Prevent 0/negative values that would deadlock locking logic.
package io.tolgee.configuration.tolgee import io.tolgee.configuration.annotations.DocProperty import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated +import jakarta.validation.constraints.Min @ConfigurationProperties(prefix = "tolgee.batch") @DocProperty(description = "Configuration of batch operations.", displayName = "Batch operations") -class BatchProperties { +@Validated +class BatchProperties { @DocProperty(description = "How many parallel jobs can be run at once on single Tolgee instance") var concurrency: Int = 1 @DocProperty(description = "How many job chunks are added to the internal queue on each scheduled run") var chunkQueuePopulationSize: Int = 1_000 @DocProperty(description = "How many parallel jobs can be run at once per project across all Tolgee instances") - var projectConcurrency: Int = 1 + @field:Min(1) + var projectConcurrency: Int = 1 }backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt (1)
49-77
: Confirm whether unlocked projects should be returned.If the lock map retains empty sets, this endpoint will list projects with UNLOCKED status. If not desired, filter out empty sets before mapping.
- val lockModels = locks.entries.map { (projectId, lockedJobIds) -> + val lockModels = locks.entries + .filter { it.value.isNotEmpty() } + .map { (projectId, lockedJobIds) -> val lockStatus = when { lockedJobIds.isEmpty() -> LockStatus.UNLOCKED else -> LockStatus.LOCKED } ... }backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (5)
18-21
: Update class KDoc to reflect multi-job concurrency.Doc still says “Only single job…”. Update to avoid confusion.
58-68
: Remove key when last job unlocks to prevent map bloat and API noise.Returning empty sets keeps stale entries and may surface UNLOCKED projects via API.
- getMap().compute(projectId) { _, lockedJobIds -> + getMap().compute(projectId) { _, lockedJobIds -> logger.debug("Unlocking job: $jobId for project $projectId") val currentJobs = lockedJobIds ?: emptySet() if (currentJobs.contains(jobId)) { logger.debug("Unlocking job: $jobId for project $projectId") - val updatedJobs = currentJobs - jobId - return@compute updatedJobs.ifEmpty { emptySet() } + val updatedJobs = currentJobs - jobId + return@compute if (updatedJobs.isEmpty()) null else updatedJobs } logger.debug("Job: $jobId for project $projectId is not locked") - return@compute currentJobs + return@compute currentJobs }Note: Returning null removes the entry (valid for ConcurrentMap.compute and Redisson RMap).
128-148
: Honor projectConcurrency also when no initial job is found.When initialJobId is null, the code locks toLock.id unconditionally. If misconfigured to 0, this bypasses the limit.
- if (initialJobId == null) { - logger.debug("No initial job found, locking only ${toLock.id}") - return setOf(toLock.id) - } + if (initialJobId == null) { + logger.debug("No initial job found") + return if (batchProperties.projectConcurrency > 0) setOf(toLock.id) else emptySet() + }Also consider validating projectConcurrency ≥ 1 in BatchProperties (see related comment).
189-191
: Specify generics on Redisson map for type safety.Minor, but explicit generics improve readability and static checks.
- private fun getRedissonProjectLocks(): ConcurrentMap<Long, Set<Long>> { - return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) - } + private fun getRedissonProjectLocks(): ConcurrentMap<Long, Set<Long>> { + return redissonClient.getMap<Long, Set<Long>>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + }
117-158
: Avoid DB calls inside distributed compute when possible.getInitialJobId() runs under RMap.compute lock; can block other lockers. Cache the initial state or probe outside compute and pass it in, only falling back if entry still empty.
Would you like a patch that moves the initial-state probe outside compute with a CAS-style retry?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt
(4 hunks)backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt
(1 hunks)backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt
(1 hunks)backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
(5 hunks)backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt
(1 hunks)
override fun afterPropertiesSet() { | ||
// This runs first to check if redis has a map of the old format. | ||
// If so, we migrate it to the new format. | ||
if (!usingRedisProvider.areWeUsingRedis) { | ||
logger.debug("Not using Redis, skipping migration check") | ||
return | ||
} | ||
|
||
val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) | ||
val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } | ||
if (!isOldFormat) { | ||
logger.debug("Redis project locks are in new format, no migration needed") | ||
return | ||
} | ||
|
||
logger.info("Starting migration of project locks from old format (v1) to new format (v2)") | ||
// First, copy all data from Redis to local memory | ||
val localCopy = mutableMapOf<Long, Set<Long>>() | ||
redisProjectBatchJobLocks.forEach { (projectId, jobId) -> | ||
val jobSet = when (jobId) { | ||
null, 0L -> emptySet<Long>() | ||
else -> setOf<Long>(jobId as Long) | ||
} | ||
localCopy[projectId] = jobSet | ||
} | ||
logger.info("Copied ${localCopy.size} project locks from old format to local memory") | ||
|
||
// Write all data back in new format (this will overwrite the old format) | ||
val newMap = getRedissonProjectLocks() | ||
localCopy.forEach { (projectId, jobSet) -> | ||
newMap[projectId] = jobSet | ||
} | ||
|
||
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migration can throw ClassCastException on mixed formats; handle both Long and Set values.
Current code assumes all values are Long/null when isOldFormat is true; mixed maps will fail.
override fun afterPropertiesSet() {
// This runs first to check if redis has a map of the old format.
// If so, we migrate it to the new format.
if (!usingRedisProvider.areWeUsingRedis) {
logger.debug("Not using Redis, skipping migration check")
return
}
- val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
- val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null }
- if (!isOldFormat) {
- logger.debug("Redis project locks are in new format, no migration needed")
- return
- }
+ val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
+ val containsOnlyNew = redisProjectBatchJobLocks.values.all { it is Set<*> }
+ if (containsOnlyNew) {
+ logger.debug("Redis project locks are in new format, no migration needed")
+ return
+ }
logger.info("Starting migration of project locks from old format (v1) to new format (v2)")
- // First, copy all data from Redis to local memory
- val localCopy = mutableMapOf<Long, Set<Long>>()
- redisProjectBatchJobLocks.forEach { (projectId, jobId) ->
- val jobSet = when (jobId) {
- null, 0L -> emptySet<Long>()
- else -> setOf<Long>(jobId as Long)
- }
- localCopy[projectId] = jobSet
- }
- logger.info("Copied ${localCopy.size} project locks from old format to local memory")
+ val localCopy = mutableMapOf<Long, Set<Long>>()
+ redisProjectBatchJobLocks.forEach { (projectId, value) ->
+ val jobSet: Set<Long> = when (value) {
+ null -> emptySet()
+ is Long -> if (value == 0L) emptySet() else setOf(value)
+ is Iterable<*> -> value.filterIsInstance<Long>().toSet()
+ else -> {
+ logger.warn("Unknown lock value type ${value::class.java} for project $projectId; treating as empty")
+ emptySet()
+ }
+ }
+ localCopy[projectId] = jobSet
+ }
+ logger.info("Prepared ${localCopy.size} project locks for new format")
// Write all data back in new format (this will overwrite the old format)
val newMap = getRedissonProjectLocks()
localCopy.forEach { (projectId, jobSet) ->
newMap[projectId] = jobSet
}
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format")
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
override fun afterPropertiesSet() { | |
// This runs first to check if redis has a map of the old format. | |
// If so, we migrate it to the new format. | |
if (!usingRedisProvider.areWeUsingRedis) { | |
logger.debug("Not using Redis, skipping migration check") | |
return | |
} | |
val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) | |
val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } | |
if (!isOldFormat) { | |
logger.debug("Redis project locks are in new format, no migration needed") | |
return | |
} | |
logger.info("Starting migration of project locks from old format (v1) to new format (v2)") | |
// First, copy all data from Redis to local memory | |
val localCopy = mutableMapOf<Long, Set<Long>>() | |
redisProjectBatchJobLocks.forEach { (projectId, jobId) -> | |
val jobSet = when (jobId) { | |
null, 0L -> emptySet<Long>() | |
else -> setOf<Long>(jobId as Long) | |
} | |
localCopy[projectId] = jobSet | |
} | |
logger.info("Copied ${localCopy.size} project locks from old format to local memory") | |
// Write all data back in new format (this will overwrite the old format) | |
val newMap = getRedissonProjectLocks() | |
localCopy.forEach { (projectId, jobSet) -> | |
newMap[projectId] = jobSet | |
} | |
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") | |
} | |
override fun afterPropertiesSet() { | |
// This runs first to check if redis has a map of the old format. | |
// If so, we migrate it to the new format. | |
if (!usingRedisProvider.areWeUsingRedis) { | |
logger.debug("Not using Redis, skipping migration check") | |
return | |
} | |
val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) | |
val containsOnlyNew = redisProjectBatchJobLocks.values.all { it is Set<*> } | |
if (containsOnlyNew) { | |
logger.debug("Redis project locks are in new format, no migration needed") | |
return | |
} | |
logger.info("Starting migration of project locks from old format (v1) to new format (v2)") | |
val localCopy = mutableMapOf<Long, Set<Long>>() | |
redisProjectBatchJobLocks.forEach { (projectId, value) -> | |
val jobSet: Set<Long> = when (value) { | |
null -> emptySet() | |
is Long -> if (value == 0L) emptySet() else setOf(value) | |
is Iterable<*> -> value.filterIsInstance<Long>().toSet() | |
else -> { | |
logger.warn("Unknown lock value type ${value::class.java} for project $projectId; treating as empty") | |
emptySet() | |
} | |
} | |
localCopy[projectId] = jobSet | |
} | |
logger.info("Prepared ${localCopy.size} project locks for new format") | |
// Write all data back in new format (this will overwrite the old format) | |
val newMap = getRedissonProjectLocks() | |
localCopy.forEach { (projectId, jobSet) -> | |
newMap[projectId] = jobSet | |
} | |
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also did you check if the cancelation works properly?
Could you please check the BatchJobManagementControllerTest
and the BatchJobManagementControllerCancellationTest
if the scenarios tested there are not broken by allowing multiple jobs per project. Please add separate tests for some of them but with projectConcurrency=2
.
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
Show resolved
Hide resolved
return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) | ||
} | ||
|
||
override fun afterPropertiesSet() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe the migration is not needed, because io.tolgee.component.CacheCleaner
clears all caches on ApplicationReadyEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about io.tolgee.component.CacheCleaner
Looking at it, it enumerates caches with io.tolgee.constants.Caches#caches
I don't see the project batch locks in there.
And theoretically, it would not make sense to synchronize n instances with a (key, value) in Redis (project_batch_job_locks
), and then drop it any time any instance starts.
In our production and qa, we've not seen the locks disappear on reboots.
We had to add scripts to clear the locks quite often when we debugged the batch problems.
The migration of data in Redis is possibly the biggest complexity in this PR, but I didn't find a way that was less intrusive. My first takes on this were to transparently handle Long? and Set data in Redis and map them to Set magically.
This proved very intrusive in a lot of methods (reading and writing), and because of type erasure, forced me to duplicate many methods.
This how I ended up a with redis value migration on start. This requires a reboot of all the instances, as the ones with version n-1 don't like the new redis value format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajeans i agree with your points.
@JanCizmar please, take a look here too. You're more familar with the code base and you once pointed me to the CacheCleaner
. Maybe there is smth else we're missing.
Also @JanCizmar , please, also comment about the restart of all instances at once. ArgoCD does it one by one, but it's of course possible to make it stop all pods by manually helping with replcas: 0
.
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
Outdated
Show resolved
Hide resolved
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
Show resolved
Hide resolved
@bdshadow I took care of 3 comments, and replied on the big comment regarding redis data migration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (1)
186-220
: Handle mixed old/new format values to prevent ClassCastException.The migration logic assumes all values are either old format (Long/null) or all new format (Set). However, line 195 checks if any value is old format, then line 207 casts all values to Long. If Redis contains a mix of formats, this will throw
ClassCastException
.This issue was previously identified by coderabbitai[bot]. Apply the suggested fix to handle all value types (null, Long, Set, and unknown) gracefully during migration.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
(5 hunks)
🔇 Additional comments (2)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (2)
5-5
: LGTM! Architecture changes support the new concurrent job requirements.The addition of
BatchProperties
for configurable concurrency andInitializingBean
for startup migration are appropriate for this feature.Also applies to: 9-9, 15-15, 25-25, 29-29
58-67
: LGTM! Set-based lock operations are correctly implemented.The unlock logic properly removes individual job IDs from the set, and the lock methods correctly use set-based computations with the new format. The
getLockedJobIds()
flatten operation correctly collects all job IDs across projects.Also applies to: 70-98, 100-114, 222-224
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt
(4 hunks)backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt
(9 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt (2)
backend/testing/src/main/kotlin/io/tolgee/AbstractSpringTest.kt (1)
clearForcedDate
(268-270)backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/AbstractBatchJobManagementControllerTest.kt (1)
saveAndPrepare
(76-80)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt (2)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/AbstractBatchJobManagementControllerTest.kt (1)
saveAndPrepare
(76-80)backend/testing/src/main/kotlin/io/tolgee/AbstractSpringTest.kt (1)
executeInNewTransaction
(256-262)
...d/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt (2)
98-110
: Use AtomicBoolean (or CountDownLatch) for cross-thread wait; avoid visibility racesLocal mutable
var wait = true
is not guaranteed visible across threads. Replace with AtomicBoolean (or a latch) to prevent intermittent hangs.- var wait = true + val wait = java.util.concurrent.atomic.AtomicBoolean(true) ... - while (wait) { + while (wait.get()) { Thread.sleep(100) } ... - wait = false + wait.set(false)Add import if needed:
import java.util.concurrent.atomic.AtomicBooleanAlso applies to: 162-164
202-207
: Same wait visibility issue in current jobs testMake
wait
AtomicBoolean (or use a latch) to avoid data races.- var wait = true + val wait = java.util.concurrent.atomic.AtomicBoolean(true) ... - while (wait) { + while (wait.get()) { Thread.sleep(100) } ... - wait = false + wait.set(false)Also applies to: 268-269
♻️ Duplicate comments (3)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt (2)
35-40
: Good fix: properly capture and restore projectConcurrencyCaching in @beforeeach and restoring in @AfterEach prevents leakage across tests. Matches the earlier recommendation.
Also applies to: 46-46
231-234
: Recheck lock count expectation here as wellSame potential flakiness: assert non‑zero and ≤ projectConcurrency; and that IDs belong to the started jobs.
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (1)
200-226
: Redis migration will ClassCast on mixed formats; handle Long/Set/null robustlyMaps can contain both old (Long/null) and new (Set) values. Current cast to Long will crash.
- val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) - val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } - if (!isOldFormat) { - logger.debug("Redis project locks are in new format, no migration needed") - return - } + val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + val containsOnlyNew = redisProjectBatchJobLocks.values.all { it is Set<*> } + if (containsOnlyNew) { + logger.debug("Redis project locks are in new format, no migration needed") + return + } @@ - val localCopy = mutableMapOf<Long, Set<Long>>() - redisProjectBatchJobLocks.forEach { (projectId, jobId) -> - val jobSet = when (jobId) { - null, 0L -> emptySet<Long>() - else -> setOf<Long>(jobId as Long) - } - localCopy[projectId] = jobSet - } - logger.info("Copied ${localCopy.size} project locks from old format to local memory") + val localCopy = mutableMapOf<Long, Set<Long>>() + redisProjectBatchJobLocks.forEach { (projectId, value) -> + val jobSet: Set<Long> = when (value) { + null -> emptySet() + is Long -> if (value == 0L) emptySet() else setOf(value) + is Iterable<*> -> value.filterIsInstance<Long>().toSet() + else -> { + logger.warn("Unknown lock value type ${value::class.java} for project $projectId; treating as empty") + emptySet() + } + } + localCopy[projectId] = jobSet + } + logger.info("Prepared ${localCopy.size} project locks for new format")
🧹 Nitpick comments (4)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt (2)
121-124
: Avoid println in testsUse logger or remove to keep CI logs clean.
- println( - "Job ${it.id} status ${it.status} progress: ${state?.values?.sumOf { it.successTargets.size }}", - ) + // optionally log via logger if needed
125-132
: Lock count assertion may be timing‑dependentYou assert lockedJobs.size == projectConcurrency while only 1 RUNNING job is expected. Finished jobs typically release locks, so the lock count may drop below N. Consider asserting:
- lockedJobs.size > 0
- lockedJobs.size <= projectConcurrency
- lockedJobs subset of jobs’ IDs
Also applies to: 129-132
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (2)
58-67
: Remove mapping when last job unlocks to avoid empty entriesReturn null from compute to delete the key when the set becomes empty. Prevents Redis clutter and keeps semantics clean.
- val updatedJobs = currentJobs - jobId - return@compute updatedJobs.ifEmpty { emptySet() } + val updatedJobs = currentJobs - jobId + return@compute if (updatedJobs.isEmpty()) null else updatedJobs
228-230
: getLockedJobIds() creates an aggregated set each callFine for diagnostics, but avoid hot paths. If used frequently, consider exposing counts per project or caching metrics instead.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt
(9 hunks)backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt (2)
backend/testing/src/main/kotlin/io/tolgee/AbstractSpringTest.kt (1)
clearForcedDate
(268-270)backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/AbstractBatchJobManagementControllerTest.kt (1)
saveAndPrepare
(76-80)
🔇 Additional comments (1)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt (1)
77-91
: RMap.compute is atomic per key across clients Redisson runs compute server-side via Lua scripting and per-key locking, so theprojectId
map compute is safe.
// nothing is locked | ||
if (lockedJobIds.isEmpty()) { | ||
logger.debug("Getting initial locked state from DB state") | ||
// we have to find out from database if there is any running job for the project | ||
val initial = getInitialJobId(projectId) | ||
logger.debug("Initial locked job $initial for project ${toLock.projectId}") | ||
if (initial == null) { | ||
logger.debug("No job found, locking ${toLock.id}") | ||
return toLock.id | ||
val initialJobId = getInitialJobId(projectId) | ||
logger.info("Initial locked job $initialJobId for project ${toLock.projectId}") | ||
if (initialJobId == null) { | ||
logger.debug("No initial job found, locking only ${toLock.id}") | ||
return setOf(toLock.id) | ||
} | ||
val newLockedJobIds = mutableSetOf<Long>(initialJobId) | ||
if (newLockedJobIds.size < batchProperties.projectConcurrency) { | ||
logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") | ||
newLockedJobIds.add(toLock.id) | ||
} else { | ||
logger.debug( | ||
"Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds" | ||
) | ||
} | ||
return newLockedJobIds | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State recovery only seeds one existing job; can exceed concurrency when multiple jobs already RUNNING
If there are ≥2 RUNNING/started jobs in DB and the in-memory/redis set is empty, seeding a single job undercounts and can admit extra jobs above projectConcurrency.
Apply this change to seed up to projectConcurrency existing jobs:
- val initialJobId = getInitialJobId(projectId)
- logger.info("Initial locked job $initialJobId for project ${toLock.projectId}")
- if (initialJobId == null) {
- logger.debug("No initial job found, locking only ${toLock.id}")
- return setOf(toLock.id)
- }
- val newLockedJobIds = mutableSetOf<Long>(initialJobId)
- if (newLockedJobIds.size < batchProperties.projectConcurrency) {
+ val initialJobIds = getInitialJobIds(projectId, batchProperties.projectConcurrency)
+ logger.info("Initial locked jobs $initialJobIds for project ${toLock.projectId}")
+ if (initialJobIds.isEmpty()) {
+ logger.debug("No initial jobs found, locking only ${toLock.id}")
+ return setOf(toLock.id)
+ }
+ val newLockedJobIds = initialJobIds.toMutableSet()
+ if (newLockedJobIds.size < batchProperties.projectConcurrency) {
logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds")
newLockedJobIds.add(toLock.id)
} else {
logger.debug(
"Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds"
)
}
return newLockedJobIds
Add helper (outside this block):
private fun getInitialJobIds(projectId: Long, limit: Int): Set<Long> {
if (limit <= 0) return emptySet()
val jobs = batchJobService.getAllIncompleteJobIds(projectId)
// Prefer RUNNING
val running = jobs.filter { it.status == io.tolgee.model.batch.BatchJobStatus.RUNNING }
.map { it.jobId }
if (running.isNotEmpty()) return running.take(limit).toSet()
// Fallback: jobs that have started processing (some chunks locked)
val unlockedCounts = batchJobService
.getAllUnlockedChunksForJobs(jobs.map { it.jobId })
.groupBy { it.batchJobId }
.mapValues { it.value.count() }
val started = jobs.filter { it.totalChunks != unlockedCounts[it.jobId] }
.map { it.jobId }
return started.take(limit).toSet()
}
🤖 Prompt for AI Agents
In backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
around lines 127 to 147, the recovery branch seeds only one existing job which
can undercount and allow more than projectConcurrency jobs to run; replace the
single-job lookup with a new helper getInitialJobIds(projectId,
batchProperties.projectConcurrency) (implement it outside this block) that
returns up to `limit` job IDs preferring RUNNING jobs, falling back to jobs that
have started processing (determined by comparing totalChunks vs unlocked chunk
counts), and then use that Set<Long> as the initial lockedJobIds (adding the
toLock.id only if it doesn’t exceed projectConcurrency). Ensure the helper calls
batchJobService.getAllIncompleteJobIds and
batchJobService.getAllUnlockedChunksForJobs as described and that limit<=0
returns empty set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajeans i think here coderabbit is also right. I think it's better to apply the changes, since it will really be correct when BatchProperties.concurrency is bigger than one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
@ValueSource(ints = [1, 2]) | ||
@ProjectJWTAuthTestMethod | ||
fun `cancels a job`() { | ||
batchDumper.finallyDump { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is batchDumper
removed here? It's really convinient for investigation in case of a test failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just maybe batchDumper
should be extended with smth like
fun <T> finallyDumpAll(fn: () -> T): T {
return try {
fn()
} finally {
getAllJobs().forEach { this.dump(it.id) }
}
}
fun getAllJobs(): List<BatchJob> = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).resultList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while playing with it i actually noticed that setting batchProperties.projectConcurrency = 2
doesn't work here. It must be passed as application properties in the @SpringBootTest(
@ajeans one more comment for discussion here. Jan has already written these concerns about multiple jobs per projects:
How do you think it must be handled? Did you try to run 2 jobs with the same keys? Or to do batch delete while batch translation is running? Actually, some tests for these cases would be good. It would show what is the expected behaviour in these cases. |
Replace old format Map<Long, Long> with new format Map<Long, Set> to support configurable concurrent jobs per project.
One-time Redis Migration automatically runs on startup and performs one-time data conversion.
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests