1- # hello_milvus .py demonstrates the basic operations of PyMilvus, a Python SDK of Milvus.
2- # 1. connect to Milvus
3- # 2. create collection
4- # 3. insert data
5- # 4. create index
6- # 5. search, query, and hybrid search on entities
7- # 6. delete entities by PK
8- # 7. drop collection
1+ # prepare_data .py - Prepare test data for Milvus backup/restore testing
2+ # Supports two scenarios:
3+ # 1. Single-stage: All data inserted at once (for testing backup/restore of old version data)
4+ # 2. Multi-stage: Data inserted in stages (for testing cross-version backup/restore with incremental data)
5+ #
6+ # Usage:
7+ # Single-stage mode (default): python prepare_data.py
8+ # Multi-stage mode: python prepare_data.py --stage 1 # then later: --stage 2
99import time
10- import os
1110import numpy as np
1211from pymilvus import (
1312 connections ,
1918
2019
2120
22- def main (uri = "http://127.0.0.1:19530" , token = "root:Milvus" ):
21+ def main (uri = "http://127.0.0.1:19530" , token = "root:Milvus" , stage = None , total_entities = 3000 ):
2322 fmt = "\n === {:30} ===\n "
24- num_entities , dim = 3000 , 8
23+ dim = 8
2524
2625 #################################################################################
2726 # 1. connect to Milvus
@@ -66,46 +65,54 @@ def main(uri="http://127.0.0.1:19530", token="root:Milvus"):
6665
6766 ################################################################################
6867 # 3. insert data
69- # We are going to insert 3000 rows of data into `hello_milvus`
68+ # We are going to insert rows of data into the collection
7069 # Data to be inserted must be organized in fields.
7170 #
7271 # The insert() method returns:
7372 # - either automatically generated primary keys by Milvus if auto_id=True in the schema;
7473 # - or the existing primary key field from the entities if auto_id=False in the schema.
7574
76- print (fmt .format ("Start inserting entities" ))
77- rng = np .random .default_rng (seed = 19530 )
78- # Prepare data
79- pk_list = [i for i in range (num_entities )]
80- random_list = rng .random (num_entities ).tolist ()
81- var_list = [str (i ) for i in range (num_entities )]
82- embeddings_list = rng .random ((num_entities , dim ))
83-
84- # Split data into 10 batches for insertion
85- batch_size = num_entities // 10
86- if batch_size == 0 :
87- batch_size = 1
75+ # Only insert data to hello_milvus when stage is None or 1
76+ if stage != 2 :
77+ print (fmt .format ("Start inserting entities to hello_milvus" ))
78+ rng = np .random .default_rng (seed = 19530 )
8879
89- for j in range (10 ):
90- start_idx = j * batch_size
91- end_idx = (j + 1 ) * batch_size if j < 9 else num_entities
92- if start_idx >= num_entities :
93- break
94-
95- # Prepare batch data
96- batch_entities = [
97- pk_list [start_idx :end_idx ],
98- random_list [start_idx :end_idx ],
99- var_list [start_idx :end_idx ],
100- embeddings_list [start_idx :end_idx ].tolist () if isinstance (embeddings_list , np .ndarray ) else embeddings_list [start_idx :end_idx ]
101- ]
80+ # hello_milvus always inserts all data when inserting
81+ num_entities = total_entities
82+ pk_list = [i for i in range (num_entities )]
83+ random_list = rng .random (num_entities ).tolist ()
84+ var_list = [str (i ) for i in range (num_entities )] # Always use original format
85+ embeddings_list = rng .random ((num_entities , dim ))
10286
103- # Insert batch data
104- insert_result = hello_milvus .insert (batch_entities )
105- time .sleep (1 ) # Add delay to prevent inserting too quickly
106- print (f"epoch { j + 1 } /10" )
107- hello_milvus .flush ()
108- print (f"Number of entities in hello_milvus: { hello_milvus .num_entities } " ) # check the num_entites
87+ # Split data into 10 batches for insertion
88+ batch_size = num_entities // 10
89+ if batch_size == 0 :
90+ batch_size = 1
91+
92+ for j in range (10 ):
93+ start_idx = j * batch_size
94+ end_idx = (j + 1 ) * batch_size if j < 9 else num_entities
95+ if start_idx >= num_entities :
96+ break
97+
98+ # Prepare batch data
99+ batch_entities = [
100+ pk_list [start_idx :end_idx ],
101+ random_list [start_idx :end_idx ],
102+ var_list [start_idx :end_idx ],
103+ embeddings_list [start_idx :end_idx ].tolist () if isinstance (embeddings_list , np .ndarray ) else embeddings_list [start_idx :end_idx ]
104+ ]
105+
106+ # Insert batch data
107+ hello_milvus .insert (batch_entities )
108+ time .sleep (1 ) # Add delay to prevent inserting too quickly
109+ print (f"epoch { j + 1 } /10" )
110+ hello_milvus .flush ()
111+ else :
112+ print ("Stage 2: Skipping data insertion to hello_milvus" )
113+ rng = np .random .default_rng (seed = 19530 ) # Initialize rng for hello_milvus2
114+
115+ print (f"Number of entities in hello_milvus: { hello_milvus .num_entities } " )
109116
110117 # create another collection
111118 fields2 = [
@@ -120,23 +127,49 @@ def main(uri="http://127.0.0.1:19530", token="root:Milvus"):
120127 print (fmt .format ("Create collection `hello_milvus2`" ))
121128 hello_milvus2 = Collection ("hello_milvus2" , schema2 , consistency_level = "Strong" )
122129
130+ # For hello_milvus2, apply stage-based data generation
131+ if stage is None :
132+ # Original scenario: all data in one go
133+ num_entities2 = total_entities
134+ entity_offset2 = 0
135+ elif stage == 1 :
136+ # Multi-stage scenario: first half of data
137+ num_entities2 = total_entities // 2
138+ entity_offset2 = 0
139+ else : # stage == 2
140+ # Multi-stage scenario: second half of data
141+ num_entities2 = total_entities - (total_entities // 2 )
142+ entity_offset2 = total_entities // 2
143+
144+ if stage is None :
145+ var_list2 = [str (i ) for i in range (num_entities2 )] # Original format
146+ else :
147+ var_list2 = [f"stage{ stage } _entity_{ i + entity_offset2 } " for i in range (num_entities2 )]
148+
123149 entities2 = [
124- rng .random (num_entities ).tolist (), # field random, only supports list
125- [ str ( i ) for i in range ( num_entities )] ,
126- rng .random ((num_entities , dim )), # field embeddings, supports numpy.ndarray and list
150+ rng .random (num_entities2 ).tolist (), # field random, only supports list
151+ var_list2 ,
152+ rng .random ((num_entities2 , dim )), # field embeddings, supports numpy.ndarray and list
127153 ]
128154
129- insert_result2 = hello_milvus2 .insert (entities2 )
155+ hello_milvus2 .insert (entities2 )
130156 hello_milvus2 .flush ()
131- insert_result2 = hello_milvus2 .insert (entities2 )
157+ hello_milvus2 .insert (entities2 )
132158 hello_milvus2 .flush ()
133159
134- print (f"Number of entities in hello_milvus2: { hello_milvus2 .num_entities } " ) # check the num_entities
160+ if stage is None :
161+ print (f"Number of entities in hello_milvus2: { hello_milvus2 .num_entities } " )
162+ else :
163+ print (f"Stage { stage } - Number of entities in hello_milvus2: { hello_milvus2 .num_entities } " )
164+ print (fmt .format (f"Stage { stage } completed for hello_milvus2" ))
165+ print (f"Stage { stage } inserted { num_entities2 } entities starting from offset { entity_offset2 } " )
135166
136167
137168if __name__ == "__main__" :
138- args = argparse .ArgumentParser (description = "prepare data" )
169+ args = argparse .ArgumentParser (description = "prepare data for backup/restore testing " )
139170 args .add_argument ("--uri" , type = str , default = "http://127.0.0.1:19530" , help = "Milvus server uri" )
140171 args .add_argument ("--token" , type = str , default = "root:Milvus" , help = "Milvus server token" )
172+ args .add_argument ("--stage" , type = int , choices = [1 , 2 ], required = False , help = "Stage 1 or 2 for multi-stage data preparation (only affects hello_milvus2). Omit for single-stage mode" )
173+ args .add_argument ("--total-entities" , type = int , default = 3000 , help = "Total number of entities (hello_milvus always gets all, hello_milvus2 respects stage)" )
141174 args = args .parse_args ()
142- main (args .uri , args .token )
175+ main (args .uri , args .token , args . stage , args . total_entities )
0 commit comments