| endpoint | bulk |
|---|---|
| lang | go |
| es_version | 9.3 |
| client | github.com/elastic/go-elasticsearch/v9 |
Use esutil.BulkIndexer to index multiple documents concurrently.
The indexer manages batching, flushing, and worker goroutines
automatically.
type Product struct {
Name string `json:"name"`
Brand string `json:"brand"`
Price float64 `json:"price"`
Category string `json:"category"`
InStock bool `json:"in_stock"`
Rating float64 `json:"rating"`
}
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "products",
Client: client,
})
if err != nil {
log.Fatalf("Error creating indexer: %s", err)
}
products := []Product{
{Name: "Espresso Machine Pro", Brand: "BrewMaster", Price: 899.99, Category: "appliances", InStock: true, Rating: 4.7},
{Name: "Noise-Cancelling Headphones", Brand: "SoundCore", Price: 249.00, Category: "electronics", InStock: true, Rating: 4.5},
{Name: "Ergonomic Standing Desk", Brand: "DeskCraft", Price: 599.00, Category: "furniture", InStock: false, Rating: 4.8},
{Name: "4K Webcam with Mic", Brand: "StreamGear", Price: 129.99, Category: "electronics", InStock: true, Rating: 4.3},
{Name: "Cast Iron Dutch Oven", Brand: "HearthStone", Price: 79.95, Category: "cookware", InStock: true, Rating: 4.9},
{Name: "Mechanical Keyboard", Brand: "TypeForce", Price: 169.00, Category: "electronics", InStock: true, Rating: 4.6},
{Name: "Air Purifier HEPA-13", Brand: "CleanAir", Price: 349.00, Category: "appliances", InStock: true, Rating: 4.4},
{Name: "Bamboo Cutting Board Set", Brand: "HearthStone", Price: 34.99, Category: "cookware", InStock: true, Rating: 4.2},
}
for i, product := range products {
data, err := json.Marshal(product)
if err != nil {
log.Fatalf("Error encoding product: %s", err)
}
indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
DocumentID: fmt.Sprintf("prod-%d", i+1),
Body: bytes.NewReader(data),
},
)
}
indexer.Close(context.Background())
stats := indexer.Stats()
fmt.Printf("Indexed %d documents\n", stats.NumFlushed)You must call Close to flush any remaining documents. The
DocumentID field is optional — omit it to let Elasticsearch generate
IDs automatically.
Attach OnFailure callbacks to each item to capture per-document
errors, and check Stats() after closing for an overall summary:
indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
OnFailure: func(
ctx context.Context,
item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem,
err error,
) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
// After Close:
stats := indexer.Stats()
if stats.NumFailed > 0 {
log.Fatalf("%d documents failed to index", stats.NumFailed)
}Adjust NumWorkers, FlushBytes, and FlushInterval to balance
throughput against cluster pressure:
indexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "products",
Client: client,
NumWorkers: 4,
FlushBytes: 5e+6,
FlushInterval: 30 * time.Second,
})