1
1
require 'set'
2
2
require 'octopus/slave_group'
3
3
require 'octopus/load_balancing/round_robin'
4
+ require 'octopus/query_analysis'
4
5
5
6
module Octopus
6
7
class Proxy
8
+ include ::Octopus ::QueryAnalysis
9
+
7
10
attr_accessor :config , :sharded
8
11
9
12
CURRENT_MODEL_KEY = 'octopus.current_model' . freeze
@@ -293,11 +296,11 @@ def method_missing(method, *args, &block)
293
296
self . last_current_shard = current_shard
294
297
clean_connection_proxy
295
298
conn . send ( method , *args , &block )
296
- elsif should_send_queries_to_shard_slave_group? ( method )
299
+ elsif should_send_queries_to_shard_slave_group? ( method , args )
297
300
send_queries_to_shard_slave_group ( method , *args , &block )
298
- elsif should_send_queries_to_slave_group? ( method )
301
+ elsif should_send_queries_to_slave_group? ( method , args )
299
302
send_queries_to_slave_group ( method , *args , &block )
300
- elsif should_send_queries_to_replicated_databases? ( method )
303
+ elsif should_send_queries_to_replicated_databases? ( method , args )
301
304
send_queries_to_selected_slave ( method , *args , &block )
302
305
else
303
306
select_connection . send ( method , *args , &block )
@@ -337,16 +340,16 @@ def connected?
337
340
@shards . any? { |_k , v | v . connected? }
338
341
end
339
342
340
- def should_send_queries_to_shard_slave_group? ( method )
341
- should_use_slaves_for_method? ( method ) && @shards_slave_groups . try ( :[] , current_shard ) . try ( :[] , current_slave_group ) . present?
343
+ def should_send_queries_to_shard_slave_group? ( method , args )
344
+ should_use_slaves_for_method? ( method , args ) && @shards_slave_groups . try ( :[] , current_shard ) . try ( :[] , current_slave_group ) . present?
342
345
end
343
346
344
347
def send_queries_to_shard_slave_group ( method , *args , &block )
345
348
send_queries_to_balancer ( @shards_slave_groups [ current_shard ] [ current_slave_group ] , method , *args , &block )
346
349
end
347
350
348
- def should_send_queries_to_slave_group? ( method )
349
- should_use_slaves_for_method? ( method ) && @slave_groups . try ( :[] , current_slave_group ) . present?
351
+ def should_send_queries_to_slave_group? ( method , args )
352
+ should_use_slaves_for_method? ( method , args ) && @slave_groups . try ( :[] , current_slave_group ) . present?
350
353
end
351
354
352
355
def send_queries_to_slave_group ( method , *args , &block )
@@ -431,8 +434,8 @@ def should_clean_connection_proxy?(method)
431
434
end
432
435
433
436
# Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
434
- def should_send_queries_to_replicated_databases? ( method )
435
- @replicated && method . to_s =~ /select/ && !block && !slaves_grouped?
437
+ def should_send_queries_to_replicated_databases? ( method , args )
438
+ @replicated && select? ( method , args ) && !block && !slaves_grouped?
436
439
end
437
440
438
441
def current_model_replicated?
@@ -458,8 +461,20 @@ def send_queries_to_selected_slave(method, *args, &block)
458
461
# (3) It's a SELECT query
459
462
# while ensuring that we revert `current_shard` from the selected slave to the (shard's) master
460
463
# not to make queries other than SELECT leak to the slave.
461
- def should_use_slaves_for_method? ( method )
462
- current_model_replicated? && method . to_s =~ /select/
464
+ def should_use_slaves_for_method? ( method , args )
465
+ current_model_replicated? && select? ( method , args )
466
+ end
467
+
468
+ # Given an ActiveRecord::Base.connection method and its arguments, determine if it is a select query.
469
+ def select? ( method , args )
470
+ is_select = method . to_s =~ /select/
471
+ unless is_select
472
+ query = args . first
473
+ if query . kind_of? String
474
+ is_select = method . to_s =~ /execute/ && definitely_select_query? ( query ) && !possibly_multiple_queries? ( query )
475
+ end
476
+ end
477
+ is_select
463
478
end
464
479
465
480
def slaves_grouped?
0 commit comments