4848#include  " memtier_benchmark.h" 
4949#include  " obj_gen.h" 
5050#include  " shard_connection.h" 
51- 
52- #define  KEY_INDEX_QUEUE_MAX_SIZE  1000000 
51+ #include  " crc16_slottable.h" 
5352
5453#define  MOVED_MSG_PREFIX  " -MOVED" 
5554#define  MOVED_MSG_PREFIX_LEN  6 
5655#define  ASK_MSG_PREFIX  " -ASK" 
5756#define  ASK_MSG_PREFIX_LEN  4 
5857
59- #define  MAX_CLUSTER_HSLOT  16383 
6058static  const  uint16_t  crc16tab[256 ]= {
6159        0x0000 ,0x1021 ,0x2042 ,0x3063 ,0x4084 ,0x50a5 ,0x60c6 ,0x70e7 ,
6260        0x8108 ,0x9129 ,0xa14a ,0xb16b ,0xc18c ,0xd1ad ,0xe1ce ,0xf1ef ,
@@ -100,24 +98,13 @@ static inline uint16_t crc16(const char *buf, size_t len) {
10098    return  crc;
10199}
102100
103- static  uint32_t  calc_hslot_crc16_cluster (const  char  *str, size_t  length)
104- {
105-     uint32_t  rv = (uint32_t ) crc16 (str, length) & MAX_CLUSTER_HSLOT;
106-     return  rv;
107- }
108- 
109101// /////////////////////////////////////////////////////////////////////////////////////////////////////
110102
111103cluster_client::cluster_client (client_group* group) : client(group)
112104{
113105}
114106
115107cluster_client::~cluster_client () {
116-     for  (unsigned  int  i = 0 ; i < m_key_index_pools.size (); i++) {
117-         key_index_pool* key_idx_pool = m_key_index_pools[i];
118-         delete  key_idx_pool;
119-     }
120-     m_key_index_pools.clear ();
121108}
122109
123110int  cluster_client::connect (void ) {
@@ -128,11 +115,6 @@ int cluster_client::connect(void) {
128115    //  set main connection to send 'CLUSTER SLOTS' command
129116    sc->set_cluster_slots ();
130117
131-     //  create key index pool for main connection
132-     key_index_pool* key_idx_pool = new  key_index_pool;
133-     m_key_index_pools.push_back (key_idx_pool);
134-     assert (m_connections.size () == m_key_index_pools.size ());
135- 
136118    //  continue with base class
137119    client::connect ();
138120
@@ -166,22 +148,10 @@ shard_connection* cluster_client::create_shard_connection(abstract_protocol* abs
166148
167149    m_connections.push_back (sc);
168150
169-     //  create key index pool
170-     key_index_pool* key_idx_pool = new  key_index_pool;
171-     assert (key_idx_pool != NULL );
172- 
173-     m_key_index_pools.push_back (key_idx_pool);
174-     assert (m_connections.size () == m_key_index_pools.size ());
175- 
176151    return  sc;
177152}
178153
179154bool  cluster_client::connect_shard_connection (shard_connection* sc, char * address, char * port) {
180-     //  empty key index queue
181-     if  (m_key_index_pools[sc->get_id ()]->size ()) {
182-         key_index_pool empty_queue;
183-         std::swap (*m_key_index_pools[sc->get_id ()], empty_queue);
184-     }
185155
186156    //  save address and port
187157    sc->set_address_port (address, port);
@@ -224,9 +194,12 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
224194     */  
225195    unsigned  long  prev_connections_size = m_connections.size ();
226196    std::vector<bool > close_sc (prev_connections_size, true );
197+     for  (unsigned  int  i = 0 ; i < MAX_SLOTS; i++) {
198+         m_conn_to_init_slot[i] = UINT16_MAX;
199+     }
227200
228201    //  run over response and create connections
229-     for  (unsigned  int  i= 0 ; i< r->get_mbulk_value ()->mbulks_elements .size (); i++) {
202+     for  (unsigned  int  i =  0 ; i <  r->get_mbulk_value ()->mbulks_elements .size (); i++) {
230203        //  create connection
231204        mbulk_size_el* shard = r->get_mbulk_value ()->mbulks_elements [i]->as_mbulk_size ();
232205
@@ -273,17 +246,26 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
273246            connect_shard_connection (sc, addr, port);
274247        }
275248
276-         //  update range
249+         unsigned  int  sc_id = sc->get_id ();
250+         //  Set the initial slot for this shard connection
251+         if  (m_conn_to_init_slot[sc_id] == UINT16_MAX) {
252+             m_conn_to_init_slot[sc_id] = min_slot;
253+         }
277254        for  (int  j = min_slot; j <= max_slot; j++) {
278-             m_slot_to_shard[j] = sc->get_id ();
255+             if  (j < max_slot) {
256+                 m_slot_lists[j] = j+1 ;
257+             } else  {
258+                 //  Close the loop - point the last index to the first one owned by the shard connection
259+                 m_slot_lists[j] = m_conn_to_init_slot[sc_id];
260+             }
279261        }
280262
281263        free (addr);
282264        free (port);
283265    }
284266
285267    //  check if some connections left with no slots, and need to be closed
286-     for  (unsigned  int  i= 0 ; i < prev_connections_size; i++) {
268+     for  (unsigned  int  i =  0 ; i < prev_connections_size; i++) {
287269        if  ((close_sc[i] == true ) &&
288270            (m_connections[i]->get_connection_state () != conn_disconnected)) {
289271
@@ -299,8 +281,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
299281
300282    //  don't exceed requests
301283    if  (m_config->requests ) {
302-         if  (m_key_index_pools[conn_id]->empty () &&
303-             m_reqs_generated >= m_config->requests ) {
284+         if  (m_reqs_generated >= m_config->requests ) {
304285            return  true ;
305286        }
306287    }
@@ -309,53 +290,13 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
309290}
310291
311292bool  cluster_client::get_key_for_conn (unsigned  int  conn_id, int  iter, unsigned  long  long * key_index) {
312-     //  first check if we already have key in pool
313-     if  (!m_key_index_pools[conn_id]->empty ()) {
314-         *key_index = m_key_index_pools[conn_id]->front ();
315-         m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" get_key_prefix (), *key_index);
316- 
317-         m_key_index_pools[conn_id]->pop ();
318-         return  true ;
319-     }
320- 
321-     //  keep generate key till it match for this connection, or requests reached
322-     while  (true ) {
323-         //  generate key
324-         *key_index = m_obj_gen->get_key_index (iter);
325-         m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" get_key_prefix (), *key_index);
326- 
327-         unsigned  int  hslot = calc_hslot_crc16_cluster (m_key_buffer, m_key_len);
328- 
329-         //  check if the key match for this connection
330-         if  (m_slot_to_shard[hslot] == conn_id) {
331-             m_reqs_generated++;
332-             return  true ;
333-         }
334- 
335-         //  handle key for other connection
336-         unsigned  int  other_conn_id = m_slot_to_shard[hslot];
337293
338-         //  in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated
339-         if  (m_connections[other_conn_id]->get_connection_state () == conn_disconnected) {
340-             m_connections[conn_id]->set_cluster_slots ();
341-             return  false ;
342-         }
343- 
344-         //  in case connection is during cluster slots command, his slots mapping not relevant
345-         if  (m_connections[other_conn_id]->get_cluster_slots_state () != setup_done)
346-             continue ;
347- 
348-         //  store key for other connection, if queue is not full
349-         key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id];
350-         if  (key_idx_pool->size () < KEY_INDEX_QUEUE_MAX_SIZE) {
351-             key_idx_pool->push (*key_index);
352-             m_reqs_generated++;
353-         }
354- 
355-         //  don't exceed requests
356-         if  (m_config->requests  > 0  && m_reqs_generated >= m_config->requests )
357-             return  false ;
358-     }
294+     *key_index = m_obj_gen->get_key_index (iter);
295+     m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s{%s}%llu" 
296+                          m_obj_gen->get_key_prefix (), crc16_slot_table[m_conn_to_init_slot[conn_id]], *key_index);
297+     m_conn_to_init_slot[conn_id] = m_slot_lists[m_conn_to_init_slot[conn_id]];
298+     m_reqs_generated++;
299+     return  true ;
359300}
360301
361302//  This function could use some urgent TLC -- but we need to do it without altering the behavior
@@ -432,10 +373,6 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
432373    if  (m_connections[conn_id]->get_cluster_slots_state () != setup_done)
433374        return ;
434375
435-     //  queue may stored uncorrected mapping indexes, empty them
436-     key_index_pool empty_queue;
437-     std::swap (*m_key_index_pools[conn_id], empty_queue);
438- 
439376    //  set connection to send 'CLUSTER SLOTS' command
440377    m_connections[conn_id]->set_cluster_slots ();
441378}
0 commit comments