@@ -7,6 +7,7 @@ package io.airbyte.commons.server.handlers
77import com.google.common.annotations.VisibleForTesting
88import io.airbyte.api.model.generated.ConnectorRolloutActorSelectionInfo
99import io.airbyte.api.model.generated.ConnectorRolloutActorSyncInfo
10+ import io.airbyte.api.model.generated.ConnectorRolloutFilters
1011import io.airbyte.api.model.generated.ConnectorRolloutFinalizeRequestBody
1112import io.airbyte.api.model.generated.ConnectorRolloutManualFinalizeRequestBody
1213import io.airbyte.api.model.generated.ConnectorRolloutManualFinalizeResponse
@@ -24,10 +25,14 @@ import io.airbyte.api.problems.model.generated.ProblemMessageData
2425import io.airbyte.api.problems.throwable.generated.ConnectorRolloutInvalidRequestProblem
2526import io.airbyte.api.problems.throwable.generated.ConnectorRolloutMaximumRolloutPercentageReachedProblem
2627import io.airbyte.api.problems.throwable.generated.ConnectorRolloutNotEnoughActorsProblem
28+ import io.airbyte.config.AttributeName
2729import io.airbyte.config.ConnectorEnumRolloutState
2830import io.airbyte.config.ConnectorEnumRolloutStrategy
2931import io.airbyte.config.ConnectorRollout
3032import io.airbyte.config.ConnectorRolloutFinalState
33+ import io.airbyte.config.CustomerTier
34+ import io.airbyte.config.CustomerTierFilter
35+ import io.airbyte.config.Operator
3136import io.airbyte.config.persistence.UserPersistence
3237import io.airbyte.connector.rollout.client.ConnectorRolloutClient
3338import io.airbyte.connector.rollout.shared.ActorSelectionInfo
@@ -161,10 +166,11 @@ open class ConnectorRolloutHandler
161166 dockerRepository : String ,
162167 actorDefinitionId : UUID ,
163168 dockerImageTag : String ,
164- updatedBy : UUID ,
169+ updatedBy : UUID ? ,
165170 rolloutStrategy : ConnectorRolloutStrategy ,
166171 initialRolloutPct : Int? ,
167172 finalTargetRolloutPct : Int? ,
173+ requestFilters : ConnectorRolloutFilters ? ,
168174 ): ConnectorRollout {
169175 val actorDefinitionVersion =
170176 actorDefinitionService.getActorDefinitionVersion(
@@ -199,6 +205,8 @@ open class ConnectorRolloutHandler
199205 ProblemMessageData ().message(" Could not find initial version for actor definition id: $actorDefinitionId " ),
200206 )
201207
208+ val filters = createFiltersFromRequest(requestFilters)
209+
202210 if (initializedRollouts.isEmpty()) {
203211 val currentTime = OffsetDateTime .now(ZoneOffset .UTC ).toEpochSecond()
204212
@@ -216,6 +224,7 @@ open class ConnectorRolloutHandler
216224 rolloutStrategy = getRolloutStrategyForManualStart(rolloutStrategy),
217225 initialRolloutPct = initialRolloutPct,
218226 finalTargetRolloutPct = finalTargetRolloutPct,
227+ filters = filters,
219228 )
220229 connectorRolloutService.writeConnectorRollout(connectorRollout)
221230 return connectorRollout
@@ -250,6 +259,7 @@ open class ConnectorRolloutHandler
250259 connectorRollout.rolloutStrategy = getRolloutStrategyForManualStart(rolloutStrategy)
251260 connectorRollout.initialRolloutPct = initialRolloutPct
252261 connectorRollout.finalTargetRolloutPct = finalTargetRolloutPct
262+ connectorRollout.filters = filters
253263
254264 connectorRolloutService.writeConnectorRollout(connectorRollout)
255265 return connectorRollout
@@ -580,7 +590,7 @@ open class ConnectorRolloutHandler
580590 fun getPinnedActorInfo (id : UUID ): ConnectorRolloutActorSelectionInfo {
581591 val rollout = connectorRolloutService.getConnectorRollout(id)
582592 logger.info { " getPinnedActorInfo: rollout=$rollout " }
583- val actorSelectionInfo = rolloutActorFinder.getActorSelectionInfo(rollout, null )
593+ val actorSelectionInfo = rolloutActorFinder.getActorSelectionInfo(rollout, null , rollout.filters )
584594 logger.info { " getPinnedActorInfo: actorSelectionInfo=$actorSelectionInfo " }
585595
586596 return ConnectorRolloutActorSelectionInfo ()
@@ -599,6 +609,7 @@ open class ConnectorRolloutHandler
599609 connectorRolloutManualStart.rolloutStrategy,
600610 connectorRolloutManualStart.initialRolloutPct,
601611 connectorRolloutManualStart.finalTargetRolloutPct,
612+ connectorRolloutManualStart.filters,
602613 )
603614
604615 try {
@@ -629,9 +640,21 @@ open class ConnectorRolloutHandler
629640 return buildConnectorRolloutRead(connectorRolloutService.getConnectorRollout(rollout.id), false )
630641 }
631642
632- open fun manualDoConnectorRolloutUpdate (connectorRolloutUpdate : ConnectorRolloutManualRolloutRequestBody ): ConnectorRolloutManualRolloutResponse {
633- val connectorRollout = connectorRolloutService.getConnectorRollout(connectorRolloutUpdate.id)
643+ open fun manualDoConnectorRollout (connectorRolloutUpdate : ConnectorRolloutManualRolloutRequestBody ): ConnectorRolloutManualRolloutResponse {
644+ var connectorRollout = connectorRolloutService.getConnectorRollout(connectorRolloutUpdate.id)
645+
634646 if (connectorRollout.state == ConnectorEnumRolloutState .INITIALIZED ) {
647+ connectorRollout =
648+ getOrCreateAndValidateManualStartInput(
649+ connectorRolloutUpdate.dockerRepository,
650+ connectorRolloutUpdate.actorDefinitionId,
651+ connectorRolloutUpdate.dockerImageTag,
652+ connectorRolloutUpdate.updatedBy,
653+ ConnectorRolloutStrategy .MANUAL ,
654+ null ,
655+ null ,
656+ connectorRolloutUpdate.filters,
657+ )
635658 try {
636659 connectorRolloutClient.startRollout(
637660 ConnectorRolloutWorkflowInput (
@@ -654,6 +677,10 @@ open class ConnectorRolloutHandler
654677 } catch (e: WorkflowUpdateException ) {
655678 throw throwAirbyteApiClientExceptionIfExists(" startWorkflow" , e)
656679 }
680+ } else {
681+ if (connectorRolloutUpdate.filters != null ) {
682+ throw RuntimeException (" Cannot modify filters in a running rollout." )
683+ }
657684 }
658685 try {
659686 connectorRolloutClient.doRollout(
@@ -805,7 +832,7 @@ open class ConnectorRolloutHandler
805832 connectorRollout : ConnectorRollout ,
806833 targetPercent : Int ,
807834 ): ActorSelectionInfo {
808- val actorSelectionInfo = rolloutActorFinder.getActorSelectionInfo(connectorRollout, targetPercent)
835+ val actorSelectionInfo = rolloutActorFinder.getActorSelectionInfo(connectorRollout, targetPercent, connectorRollout.filters )
809836 if (targetPercent > 0 && actorSelectionInfo.actorIdsToPin.isEmpty() && actorSelectionInfo.nPreviouslyPinned == 0 ) {
810837 throw ConnectorRolloutNotEnoughActorsProblem (
811838 ProblemMessageData ().message(
@@ -867,6 +894,32 @@ open class ConnectorRolloutHandler
867894 return rollouts.first()
868895 }
869896
897+ private fun createFiltersFromRequest (filters : ConnectorRolloutFilters ? ): io.airbyte.config.ConnectorRolloutFilters {
898+ if (filters?.tierFilter == null ) {
899+ return io.airbyte.config.ConnectorRolloutFilters (
900+ customerTierFilters =
901+ listOf (
902+ CustomerTierFilter (
903+ name = AttributeName .TIER ,
904+ operator = Operator .IN ,
905+ value = listOf (CustomerTier .TIER_2 ),
906+ ),
907+ ),
908+ )
909+ } else {
910+ return io.airbyte.config.ConnectorRolloutFilters (
911+ customerTierFilters =
912+ listOf (
913+ CustomerTierFilter (
914+ name = AttributeName .TIER ,
915+ operator = Operator .IN ,
916+ value = listOf (CustomerTier .valueOf(filters.tierFilter!! .tier.toString())),
917+ ),
918+ ),
919+ )
920+ }
921+ }
922+
870923 private fun throwAirbyteApiClientExceptionIfExists (
871924 handlerName : String ,
872925 e : WorkflowUpdateException ,
0 commit comments