From 7e68118d91f6c4c25e1af810879da3d39321f726 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Mon, 3 Aug 2015 15:27:08 -0400 Subject: [PATCH 1/9] introduced default shards --- lib/octopus/proxy.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index f5cf4934..cf2efad2 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -19,6 +19,9 @@ def initialize_shards(config) @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config + @default_shard = config['defaults'].try(:[], 'shard') || :master + @default_slave_group = config['defaults'].try(:[], 'slave_group') + unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] @@ -73,6 +76,10 @@ def initialize_shards(config) end end + if (@default_shard != :master) && @shards_slave_groups.present? && !@shards_slave_groups[@default_shard].try(:has_key?, @default_slave_group) + fail 'default slave group should be included in default shard' + end + @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end @@ -98,7 +105,7 @@ def current_model=(model) end def current_shard - Thread.current['octopus.current_shard'] ||= :master + Thread.current['octopus.current_shard'] ||= @default_shard end def current_shard=(shard_symbol) @@ -146,7 +153,7 @@ def current_group=(group_symbol) end def current_slave_group - Thread.current['octopus.current_slave_group'] + Thread.current['octopus.current_slave_group'] ||= @default_slave_group end def current_slave_group=(slave_group_symbol) From 43bcb9536fc4a30d0d348a8b830460614b5a1463 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Tue, 4 Aug 2015 21:58:57 -0400 Subject: [PATCH 2/9] make work with shard + slave groups & default value for slave group --- lib/octopus/proxy.rb | 30 +++++++++++++++--------------- threads | 0 2 files changed, 15 insertions(+), 15 deletions(-) create mode 100644 threads diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index cf2efad2..1b0e56fb 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -19,8 +19,11 @@ def initialize_shards(config) @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config - @default_shard = config['defaults'].try(:[], 'shard') || :master - @default_slave_group = config['defaults'].try(:[], 'slave_group') + @default_shard = config['defaults'].try(:[], 'shard') + fail 'default shard shoule be set' if @default_shard.blank? + + default_slave_group_name = config['defaults'].try(:[], 'slave_group') + @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } unless config.nil? @entire_sharded = config['entire_sharded'] @@ -76,11 +79,7 @@ def initialize_shards(config) end end - if (@default_shard != :master) && @shards_slave_groups.present? && !@shards_slave_groups[@default_shard].try(:has_key?, @default_slave_group) - fail 'default slave group should be included in default shard' - end - - @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus + #@shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end def initialize_replication(config) @@ -109,7 +108,6 @@ def current_shard end def current_shard=(shard_symbol) - self.current_slave_group = nil if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) @@ -136,6 +134,7 @@ def current_shard=(shard_symbol) fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end + self.current_slave_group ||= @default_slave_groups[shard_symbol] Thread.current['octopus.current_shard'] = shard_symbol end @@ -153,7 +152,7 @@ def current_group=(group_symbol) end def current_slave_group - Thread.current['octopus.current_slave_group'] ||= @default_slave_group + Thread.current['octopus.current_slave_group'] || @default_slave_groups[current_shard] end def current_slave_group=(slave_group_symbol) @@ -209,7 +208,7 @@ def shards_for_group(group) # reconnect, but in Rails 3.1 the flag prevents this. def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true - if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled + if !connection_pool.connected? && @shards[@default_shard].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection @@ -242,7 +241,7 @@ def send_queries_to_multiple_shards(shards, &block) end def clean_connection_proxy - self.current_shard = :master + self.current_shard = @default_shard self.current_model = nil self.current_group = nil self.block = nil @@ -250,13 +249,13 @@ def clean_connection_proxy def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( - ActiveRecord::Migrator.schema_migrations_table_name, + ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end def transaction(options = {}, &block) if !sharded && current_model_replicated? - run_queries_on_shard(:master) do + run_queries_on_shard(@default_shard) do select_connection.transaction(options, &block) end else @@ -356,7 +355,7 @@ def with_each_healthy_shard end ar_pools.each do |pool| - next if pool == @shards[:master] # Already handled this + next if pool == @shards[@default_shard] # Already handled this begin yield(pool) @@ -420,7 +419,7 @@ def send_queries_to_selected_slave(method, *args, &block) if current_model.replicated || fully_replicated? selected_slave = @slaves_load_balancer.next else - selected_slave = :master + selected_slave = @default_shard end send_queries_to_slave(selected_slave, method, *args, &block) @@ -495,3 +494,4 @@ def structurally_slave_group?(config) end end end + diff --git a/threads b/threads new file mode 100644 index 00000000..e69de29b From 99ace335f51d9b227b18285974ea418d05642630 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Tue, 4 Aug 2015 22:24:12 -0400 Subject: [PATCH 3/9] fixed octopus --- lib/octopus/proxy.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 1b0e56fb..bf04bac8 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -22,15 +22,14 @@ def initialize_shards(config) @default_shard = config['defaults'].try(:[], 'shard') fail 'default shard shoule be set' if @default_shard.blank? - default_slave_group_name = config['defaults'].try(:[], 'slave_group') - @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } - unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] + default_slave_group_name = config['defaults'].try(:[], 'slave_group') + @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } @shards_config.each do |key, value| if value.is_a?(String) From 633adf941b546c23c46d9f5719e47310db57ba83 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Tue, 4 Aug 2015 23:02:18 -0400 Subject: [PATCH 4/9] fixed issue with deploy --- lib/octopus/proxy.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index bf04bac8..57923535 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -29,7 +29,11 @@ def initialize_shards(config) @shards_config ||= [] default_slave_group_name = config['defaults'].try(:[], 'slave_group') - @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } + if @shards_config.is_a?(Hash) + @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } + else + @default_slave_groups = {} + end @shards_config.each do |key, value| if value.is_a?(String) From 906ae0c5179012607467c0cde4510d8f15e40651 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Fri, 21 Aug 2015 14:17:12 -0400 Subject: [PATCH 5/9] fixed shard with slaves --- lib/octopus/proxy.rb | 32 +++++++++++++++++--------------- lib/octopus/slave_group.rb | 1 - threads | 0 3 files changed, 17 insertions(+), 16 deletions(-) delete mode 100644 threads diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 57923535..93285d71 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -54,8 +54,7 @@ def initialize_shards(config) slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| - @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") - slaves[slave_name.to_sym] = slave_name.to_sym + slaves[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @@ -81,8 +80,6 @@ def initialize_shards(config) end end end - - #@shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end def initialize_replication(config) @@ -111,11 +108,13 @@ def current_shard end def current_shard=(shard_symbol) + self.current_slave_group = nil + if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol - shard_symbol = hash[:shard] + shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] if shard_symbol.nil? && slave_group_symbol.nil? @@ -127,8 +126,8 @@ def current_shard=(shard_symbol) end if slave_group_symbol.present? - if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || - (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) + if (slave_group_symbol != :master) && ((@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || + (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?)) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end self.current_slave_group = slave_group_symbol @@ -137,7 +136,7 @@ def current_shard=(shard_symbol) fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end - self.current_slave_group ||= @default_slave_groups[shard_symbol] + #self.current_slave_group ||= @default_slave_groups[shard_symbol] Thread.current['octopus.current_shard'] = shard_symbol end @@ -217,8 +216,8 @@ def safe_connection(connection_pool) connection_pool.connection end - def select_connection - safe_connection(@shards[shard_name]) + def select_connection(connection = nil) + safe_connection(connection || @shards[shard_name]) end def shard_name @@ -275,8 +274,10 @@ def method_missing(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) + raise NotImplementedError.new send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) + raise NotImplementedError.new send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) @@ -321,7 +322,11 @@ def should_send_queries_to_shard_slave_group?(method) end def send_queries_to_shard_slave_group(method, *args, &block) - send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) + if current_slave_group == :master + send_queries_to_slave(@shards[current_shard], method, *args, &block) + else + send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) + end end def should_send_queries_to_slave_group?(method) @@ -454,9 +459,7 @@ def send_queries_to_balancer(balancer, method, *args, &block) # Temporarily switch `current_shard` to the specified slave and send queries to it # while preserving `current_shard` def send_queries_to_slave(slave, method, *args, &block) - using_shard(slave) do - select_connection.send(method, *args, &block) - end + select_connection(slave).send(method, *args, &block) end # Temporarily block cleaning connection proxy and run the block @@ -497,4 +500,3 @@ def structurally_slave_group?(config) end end end - diff --git a/lib/octopus/slave_group.rb b/lib/octopus/slave_group.rb index e36611e9..944e75f1 100644 --- a/lib/octopus/slave_group.rb +++ b/lib/octopus/slave_group.rb @@ -1,7 +1,6 @@ module Octopus class SlaveGroup def initialize(slaves) - slaves = HashWithIndifferentAccess.new(slaves) slaves_list = slaves.values @load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list) end diff --git a/threads b/threads deleted file mode 100644 index e69de29b..00000000 From 6f91058f33de314e28c6b5b372d3dd16e343c1cc Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Sat, 22 Aug 2015 15:56:22 -0400 Subject: [PATCH 6/9] implemented run on slave in slave groups --- lib/octopus/load_balancing/round_robin.rb | 8 ++++++-- lib/octopus/proxy.rb | 21 +++++++++++++++++++-- lib/octopus/slave_group.rb | 18 +++++++++++++++--- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/lib/octopus/load_balancing/round_robin.rb b/lib/octopus/load_balancing/round_robin.rb index 7c98d993..195aa36a 100644 --- a/lib/octopus/load_balancing/round_robin.rb +++ b/lib/octopus/load_balancing/round_robin.rb @@ -11,8 +11,12 @@ def initialize(slaves_list) end # Returns the next available slave in the pool - def next - @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + def next(index = nil) + if index + @slaves_list[index] + else + @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + end end end end diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 93285d71..314d58b3 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -109,6 +109,7 @@ def current_shard def current_shard=(shard_symbol) self.current_slave_group = nil + self.current_slave = nil if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } @@ -116,6 +117,7 @@ def current_shard=(shard_symbol) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] + slave_symbol = hash[:slave] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' @@ -132,11 +134,18 @@ def current_shard=(shard_symbol) end self.current_slave_group = slave_group_symbol end + + if slave_symbol.present? + unless @shards_slave_groups[shard_symbol].try(:[], slave_group_symbol).try(:has_slave?, slave_symbol) + fail "Nonexistent Slave Name: #{slave_symbol} in slave group: #{slave_group_symbol}" + end + + self.current_slave = slave_symbol + end else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end - #self.current_slave_group ||= @default_slave_groups[shard_symbol] Thread.current['octopus.current_shard'] = shard_symbol end @@ -161,6 +170,14 @@ def current_slave_group=(slave_group_symbol) Thread.current['octopus.current_slave_group'] = slave_group_symbol end + def current_slave + Thread.current['octopus.current_slave'] + end + + def current_slave=(slave_symbol) + Thread.current['octopus.current_slave'] = slave_symbol + end + def block Thread.current['octopus.block'] end @@ -453,7 +470,7 @@ def slaves_grouped? # Temporarily switch `current_shard` to the next slave in a slave group and send queries to it # while preserving `current_shard` def send_queries_to_balancer(balancer, method, *args, &block) - send_queries_to_slave(balancer.next, method, *args, &block) + send_queries_to_slave(balancer.next(current_slave), method, *args, &block) end # Temporarily switch `current_shard` to the specified slave and send queries to it diff --git a/lib/octopus/slave_group.rb b/lib/octopus/slave_group.rb index 944e75f1..20111e09 100644 --- a/lib/octopus/slave_group.rb +++ b/lib/octopus/slave_group.rb @@ -1,12 +1,24 @@ module Octopus class SlaveGroup def initialize(slaves) - slaves_list = slaves.values + @name_index_map = HashWithIndifferentAccess.new + slaves_list, index = [], 0 + + slaves.each do |name, db_connection_pool| + slaves_list << db_connection_pool + @name_index_map[name] = index + index += 1 + end + @load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list) end - def next - @load_balancer.next + def has_slave?(slave_name) + @name_index_map.has_key?(slave_name) + end + + def next(slave_name = nil) + @load_balancer.next(@name_index_map[slave_name]) end end end From 76f4f5a4f698be5339d8edb996da8d62306de279 Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Mon, 24 Aug 2015 19:16:55 -0400 Subject: [PATCH 7/9] fixed issue, when cache was not reset for slaves --- lib/octopus/proxy.rb | 9 ++++++++- lib/octopus/slave_group.rb | 12 ++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 314d58b3..66269305 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -15,6 +15,7 @@ def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new + @shard_servers = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config @@ -44,6 +45,7 @@ def initialize_shards(config) value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") + @shard_servers[key.to_sym] = [@shards[key.to_sym]] slave_group_configs = value.select do |_k, v| structurally_slave_group? v @@ -55,6 +57,7 @@ def initialize_shards(config) slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| slaves[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") + @shard_servers[key.to_sym] << slaves[slave_name.to_sym] end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @@ -356,9 +359,13 @@ def send_queries_to_slave_group(method, *args, &block) protected + def shard_servers + @shard_servers[current_shard] + end + # Ensure that a single failing slave doesn't take down the entire application def with_each_healthy_shard - @shards.each do |shard_name, v| + shard_servers.each do |shard_name, v| begin yield(v) rescue => e diff --git a/lib/octopus/slave_group.rb b/lib/octopus/slave_group.rb index 20111e09..16a507fc 100644 --- a/lib/octopus/slave_group.rb +++ b/lib/octopus/slave_group.rb @@ -2,23 +2,27 @@ module Octopus class SlaveGroup def initialize(slaves) @name_index_map = HashWithIndifferentAccess.new - slaves_list, index = [], 0 + @slaves_list, index = [], 0 slaves.each do |name, db_connection_pool| - slaves_list << db_connection_pool + @slaves_list << db_connection_pool @name_index_map[name] = index index += 1 end - @load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list) + @load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list) end def has_slave?(slave_name) @name_index_map.has_key?(slave_name) end + def slaves + @slaves_list + end + def next(slave_name = nil) @load_balancer.next(@name_index_map[slave_name]) end end -end +end \ No newline at end of file From 1f551534c23627c283c7237026c15c62b8d9ecee Mon Sep 17 00:00:00 2001 From: Dzmitry Nikitsin Date: Mon, 24 Aug 2015 19:21:19 -0400 Subject: [PATCH 8/9] fixed typo --- lib/octopus/proxy.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 66269305..fa0b04e3 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -365,12 +365,12 @@ def shard_servers # Ensure that a single failing slave doesn't take down the entire application def with_each_healthy_shard - shard_servers.each do |shard_name, v| + shard_servers.each do |v| begin yield(v) rescue => e if Octopus.robust_environment? - Octopus.logger.error "Error on shard #{shard_name}: #{e.message}" + Octopus.logger.error "Error on shard #{v.spec.config['host']}, database #{v.spec.config['database']}: #{e.message}" else raise end From c4e35941a2beef6bb6df14fbc6a466515e446de4 Mon Sep 17 00:00:00 2001 From: Yegor Zdanovich Date: Thu, 31 Mar 2016 11:31:39 +0300 Subject: [PATCH 9/9] fix stack level to deep accoring https://github.com/thiagopradi/octopus/commit/b1e70ac184d2a0898529bf2a1cd7d69870bb00ab --- lib/octopus/association_shard_tracking.rb | 4 ++-- spec/octopus/association_shard_tracking_spec.rb | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/octopus/association_shard_tracking.rb b/lib/octopus/association_shard_tracking.rb index 692b5d0f..87a6656f 100644 --- a/lib/octopus/association_shard_tracking.rb +++ b/lib/octopus/association_shard_tracking.rb @@ -26,8 +26,8 @@ module QueryOnCurrentShard METHODS.each do |m| define_method m.to_sym do |*args, &block| - if self.respond_to?(:proxy_association) && proxy_association - proxy_association.owner.run_on_shard { super(*args, &block) } + if defined?(@association) && @association + @association.owner.run_on_shard { super(*args, &block) } else super(*args, &block) end diff --git a/spec/octopus/association_shard_tracking_spec.rb b/spec/octopus/association_shard_tracking_spec.rb index e0206343..a61d69e8 100644 --- a/spec/octopus/association_shard_tracking_spec.rb +++ b/spec/octopus/association_shard_tracking_spec.rb @@ -630,6 +630,12 @@ expect(@brazil_client.comments.count).to eq(2) end + it 'group + count' do + expect(@brazil_client.comments.group(:id).count.length).to eq(1) + _cmt = @brazil_client.comments.create(:name => 'Builded Comment') + expect(@brazil_client.comments.group(:id).count.length).to eq(2) + end + it 'size' do expect(@brazil_client.comments.size).to eq(1) _cmt = @brazil_client.comments.create(:name => 'Builded Comment')