diff --git a/lib/octopus/association_shard_tracking.rb b/lib/octopus/association_shard_tracking.rb index 692b5d0f..87a6656f 100644 --- a/lib/octopus/association_shard_tracking.rb +++ b/lib/octopus/association_shard_tracking.rb @@ -26,8 +26,8 @@ module QueryOnCurrentShard METHODS.each do |m| define_method m.to_sym do |*args, &block| - if self.respond_to?(:proxy_association) && proxy_association - proxy_association.owner.run_on_shard { super(*args, &block) } + if defined?(@association) && @association + @association.owner.run_on_shard { super(*args, &block) } else super(*args, &block) end diff --git a/lib/octopus/load_balancing/round_robin.rb b/lib/octopus/load_balancing/round_robin.rb index 7c98d993..195aa36a 100644 --- a/lib/octopus/load_balancing/round_robin.rb +++ b/lib/octopus/load_balancing/round_robin.rb @@ -11,8 +11,12 @@ def initialize(slaves_list) end # Returns the next available slave in the pool - def next - @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + def next(index = nil) + if index + @slaves_list[index] + else + @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + end end end end diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index f5cf4934..fa0b04e3 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -15,16 +15,26 @@ def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new + @shard_servers = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config + @default_shard = config['defaults'].try(:[], 'shard') + fail 'default shard shoule be set' if @default_shard.blank? + unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] + default_slave_group_name = config['defaults'].try(:[], 'slave_group') + if @shards_config.is_a?(Hash) + @default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h } + else + @default_slave_groups = {} + end @shards_config.each do |key, value| if value.is_a?(String) @@ -35,6 +45,7 @@ def initialize_shards(config) value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") + @shard_servers[key.to_sym] = [@shards[key.to_sym]] slave_group_configs = value.select do |_k, v| structurally_slave_group? v @@ -45,8 +56,8 @@ def initialize_shards(config) slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| - @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") - slaves[slave_name.to_sym] = slave_name.to_sym + slaves[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") + @shard_servers[key.to_sym] << slaves[slave_name.to_sym] end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @@ -72,8 +83,6 @@ def initialize_shards(config) end end end - - @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end def initialize_replication(config) @@ -98,17 +107,20 @@ def current_model=(model) end def current_shard - Thread.current['octopus.current_shard'] ||= :master + Thread.current['octopus.current_shard'] ||= @default_shard end def current_shard=(shard_symbol) self.current_slave_group = nil + self.current_slave = nil + if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol - shard_symbol = hash[:shard] + shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] + slave_symbol = hash[:slave] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' @@ -119,12 +131,20 @@ def current_shard=(shard_symbol) end if slave_group_symbol.present? - if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || - (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) + if (slave_group_symbol != :master) && ((@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || + (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?)) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end self.current_slave_group = slave_group_symbol end + + if slave_symbol.present? + unless @shards_slave_groups[shard_symbol].try(:[], slave_group_symbol).try(:has_slave?, slave_symbol) + fail "Nonexistent Slave Name: #{slave_symbol} in slave group: #{slave_group_symbol}" + end + + self.current_slave = slave_symbol + end else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end @@ -146,13 +166,21 @@ def current_group=(group_symbol) end def current_slave_group - Thread.current['octopus.current_slave_group'] + Thread.current['octopus.current_slave_group'] || @default_slave_groups[current_shard] end def current_slave_group=(slave_group_symbol) Thread.current['octopus.current_slave_group'] = slave_group_symbol end + def current_slave + Thread.current['octopus.current_slave'] + end + + def current_slave=(slave_symbol) + Thread.current['octopus.current_slave'] = slave_symbol + end + def block Thread.current['octopus.block'] end @@ -202,14 +230,14 @@ def shards_for_group(group) # reconnect, but in Rails 3.1 the flag prevents this. def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true - if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled + if !connection_pool.connected? && @shards[@default_shard].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection end - def select_connection - safe_connection(@shards[shard_name]) + def select_connection(connection = nil) + safe_connection(connection || @shards[shard_name]) end def shard_name @@ -235,7 +263,7 @@ def send_queries_to_multiple_shards(shards, &block) end def clean_connection_proxy - self.current_shard = :master + self.current_shard = @default_shard self.current_model = nil self.current_group = nil self.block = nil @@ -243,13 +271,13 @@ def clean_connection_proxy def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( - ActiveRecord::Migrator.schema_migrations_table_name, + ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end def transaction(options = {}, &block) if !sharded && current_model_replicated? - run_queries_on_shard(:master) do + run_queries_on_shard(@default_shard) do select_connection.transaction(options, &block) end else @@ -266,8 +294,10 @@ def method_missing(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) + raise NotImplementedError.new send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) + raise NotImplementedError.new send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) @@ -312,7 +342,11 @@ def should_send_queries_to_shard_slave_group?(method) end def send_queries_to_shard_slave_group(method, *args, &block) - send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) + if current_slave_group == :master + send_queries_to_slave(@shards[current_shard], method, *args, &block) + else + send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) + end end def should_send_queries_to_slave_group?(method) @@ -325,14 +359,18 @@ def send_queries_to_slave_group(method, *args, &block) protected + def shard_servers + @shard_servers[current_shard] + end + # Ensure that a single failing slave doesn't take down the entire application def with_each_healthy_shard - @shards.each do |shard_name, v| + shard_servers.each do |v| begin yield(v) rescue => e if Octopus.robust_environment? - Octopus.logger.error "Error on shard #{shard_name}: #{e.message}" + Octopus.logger.error "Error on shard #{v.spec.config['host']}, database #{v.spec.config['database']}: #{e.message}" else raise end @@ -349,7 +387,7 @@ def with_each_healthy_shard end ar_pools.each do |pool| - next if pool == @shards[:master] # Already handled this + next if pool == @shards[@default_shard] # Already handled this begin yield(pool) @@ -413,7 +451,7 @@ def send_queries_to_selected_slave(method, *args, &block) if current_model.replicated || fully_replicated? selected_slave = @slaves_load_balancer.next else - selected_slave = :master + selected_slave = @default_shard end send_queries_to_slave(selected_slave, method, *args, &block) @@ -439,15 +477,13 @@ def slaves_grouped? # Temporarily switch `current_shard` to the next slave in a slave group and send queries to it # while preserving `current_shard` def send_queries_to_balancer(balancer, method, *args, &block) - send_queries_to_slave(balancer.next, method, *args, &block) + send_queries_to_slave(balancer.next(current_slave), method, *args, &block) end # Temporarily switch `current_shard` to the specified slave and send queries to it # while preserving `current_shard` def send_queries_to_slave(slave, method, *args, &block) - using_shard(slave) do - select_connection.send(method, *args, &block) - end + select_connection(slave).send(method, *args, &block) end # Temporarily block cleaning connection proxy and run the block diff --git a/lib/octopus/slave_group.rb b/lib/octopus/slave_group.rb index e36611e9..16a507fc 100644 --- a/lib/octopus/slave_group.rb +++ b/lib/octopus/slave_group.rb @@ -1,13 +1,28 @@ module Octopus class SlaveGroup def initialize(slaves) - slaves = HashWithIndifferentAccess.new(slaves) - slaves_list = slaves.values - @load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list) + @name_index_map = HashWithIndifferentAccess.new + @slaves_list, index = [], 0 + + slaves.each do |name, db_connection_pool| + @slaves_list << db_connection_pool + @name_index_map[name] = index + index += 1 + end + + @load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list) + end + + def has_slave?(slave_name) + @name_index_map.has_key?(slave_name) + end + + def slaves + @slaves_list end - def next - @load_balancer.next + def next(slave_name = nil) + @load_balancer.next(@name_index_map[slave_name]) end end -end +end \ No newline at end of file diff --git a/spec/octopus/association_shard_tracking_spec.rb b/spec/octopus/association_shard_tracking_spec.rb index e0206343..a61d69e8 100644 --- a/spec/octopus/association_shard_tracking_spec.rb +++ b/spec/octopus/association_shard_tracking_spec.rb @@ -630,6 +630,12 @@ expect(@brazil_client.comments.count).to eq(2) end + it 'group + count' do + expect(@brazil_client.comments.group(:id).count.length).to eq(1) + _cmt = @brazil_client.comments.create(:name => 'Builded Comment') + expect(@brazil_client.comments.group(:id).count.length).to eq(2) + end + it 'size' do expect(@brazil_client.comments.size).to eq(1) _cmt = @brazil_client.comments.create(:name => 'Builded Comment')