Skip to content

Commit bad8a75

Browse files
committed
circuit breaker that opens based on error rate (% errors in time window)
1 parent d5fb106 commit bad8a75

File tree

4 files changed

+547
-0
lines changed

4 files changed

+547
-0
lines changed

lib/semian.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
require 'semian/platform'
99
require 'semian/resource'
1010
require 'semian/circuit_breaker'
11+
require 'semian/error_rate_circuit_breaker'
1112
require 'semian/protected_resource'
1213
require 'semian/unprotected_resource'
1314
require 'semian/simple_sliding_window'
15+
require 'semian/time_sliding_window'
1416
require 'semian/simple_integer'
1517
require 'semian/simple_state'
1618
require 'semian/lru_hash'
@@ -245,9 +247,27 @@ def thread_safe=(thread_safe)
245247

246248
private
247249

250+
def create_error_rate_circuit_breaker(name, **options)
251+
require_keys!([:success_threshold, :error_percent_threshold, :error_timeout,
252+
:request_volume_threshold, :window_size], options)
253+
254+
exceptions = options[:exceptions] || []
255+
ErrorRateCircuitBreaker.new(name,
256+
success_threshold: options[:success_threshold],
257+
error_percent_threshold: options[:error_percent_threshold],
258+
error_timeout: options[:error_timeout],
259+
exceptions: Array(exceptions) + [::Semian::BaseError],
260+
half_open_resource_timeout: options[:half_open_resource_timeout],
261+
request_volume_threshold: options[:request_volume_threshold],
262+
window_size: options[:window_size],
263+
implementation: implementation(**options))
264+
end
265+
248266
def create_circuit_breaker(name, **options)
249267
circuit_breaker = options.fetch(:circuit_breaker, true)
250268
return unless circuit_breaker
269+
return create_error_rate_circuit_breaker(name, **options) if options.key?(:error_percent_threshold)
270+
251271
require_keys!([:success_threshold, :error_threshold, :error_timeout], options)
252272

253273
exceptions = options[:exceptions] || []
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
module Semian
2+
class ErrorRateCircuitBreaker #:nodoc:
3+
extend Forwardable
4+
5+
def_delegators :@state, :closed?, :open?, :half_open?
6+
7+
attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error, :error_percent_threshold,
8+
:request_volume_threshold, :success_count_threshold
9+
10+
def initialize(name, exceptions:, error_percent_threshold:, error_timeout:, window_size:,
11+
request_volume_threshold:, success_threshold:, implementation:, half_open_resource_timeout: nil)
12+
13+
raise 'error_threshold_percent should be between 0.0 and 1.0 exclusive' unless (0.0001...1.0).cover?(error_percent_threshold)
14+
15+
@name = name.to_sym
16+
@error_timeout = error_timeout
17+
@exceptions = exceptions
18+
@half_open_resource_timeout = half_open_resource_timeout
19+
@error_percent_threshold = error_percent_threshold
20+
@last_error_time = nil
21+
@request_volume_threshold = request_volume_threshold
22+
@success_count_threshold = success_threshold
23+
24+
@results = implementation::TimeSlidingWindow.new(window_size)
25+
@state = implementation::State.new
26+
27+
reset
28+
end
29+
30+
def acquire(resource = nil, &block)
31+
return yield if disabled?
32+
transition_to_half_open if transition_to_half_open?
33+
34+
raise OpenCircuitError unless request_allowed?
35+
36+
result = nil
37+
begin
38+
result = maybe_with_half_open_resource_timeout(resource, &block)
39+
rescue *@exceptions => error
40+
if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits?
41+
mark_failed(error)
42+
end
43+
raise error
44+
else
45+
mark_success
46+
end
47+
result
48+
end
49+
50+
def transition_to_half_open?
51+
open? && error_timeout_expired? && !half_open?
52+
end
53+
54+
def request_allowed?
55+
closed? || half_open? || transition_to_half_open?
56+
end
57+
58+
def mark_failed(error)
59+
push_error(error)
60+
if closed?
61+
transition_to_open if error_threshold_reached?
62+
elsif half_open?
63+
transition_to_open
64+
end
65+
end
66+
67+
def mark_success
68+
@results << true
69+
return unless half_open?
70+
transition_to_close if success_threshold_reached?
71+
end
72+
73+
def reset
74+
@last_error_time = nil
75+
@results.clear
76+
transition_to_close
77+
end
78+
79+
def destroy
80+
@state.destroy
81+
end
82+
83+
# TODO understand what this is used for inside Semian lib
84+
def in_use?
85+
return false if error_timeout_expired?
86+
@results.count(false) > 0
87+
end
88+
89+
private
90+
91+
def current_time
92+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
93+
end
94+
95+
def transition_to_close
96+
notify_state_transition(:closed)
97+
log_state_transition(:closed)
98+
@state.close!
99+
@results.clear
100+
end
101+
102+
def transition_to_open
103+
notify_state_transition(:open)
104+
log_state_transition(:open)
105+
@state.open!
106+
end
107+
108+
def transition_to_half_open
109+
notify_state_transition(:half_open)
110+
log_state_transition(:half_open)
111+
@state.half_open!
112+
@results.clear
113+
end
114+
115+
def success_threshold_reached?
116+
@results.count(true) >= @success_count_threshold
117+
end
118+
119+
def error_threshold_reached?
120+
return false if @results.empty? or @results.length < @request_volume_threshold
121+
@results.count(false).to_f / @results.length.to_f >= @error_percent_threshold
122+
end
123+
124+
def error_timeout_expired?
125+
return false unless @last_error_time
126+
current_time - @last_error_time >= @error_timeout
127+
end
128+
129+
def push_error(error)
130+
@last_error = error
131+
@last_error_time = current_time
132+
@results << false
133+
end
134+
135+
def log_state_transition(new_state)
136+
return if @state.nil? || new_state == @state.value
137+
138+
str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}."
139+
str << " success_count=#{@results.count(true)} error_count=#{@results.count(false)}"
140+
str << " success_count_threshold=#{@success_count_threshold} error_count_percent=#{@error_percent_threshold}"
141+
str << " error_timeout=#{@error_timeout} error_last_at=\"#{@last_error_time}\""
142+
str << " name=\"#{@name}\""
143+
Semian.logger.info(str)
144+
end
145+
146+
def notify_state_transition(new_state)
147+
Semian.notify(:state_change, self, nil, nil, state: new_state)
148+
end
149+
150+
def disabled?
151+
ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] || ENV['SEMIAN_DISABLED']
152+
end
153+
154+
def maybe_with_half_open_resource_timeout(resource, &block)
155+
result =
156+
if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout)
157+
resource.with_resource_timeout(@half_open_resource_timeout) do
158+
block.call
159+
end
160+
else
161+
block.call
162+
end
163+
164+
result
165+
end
166+
end
167+
end

0 commit comments

Comments
 (0)