Skip to content

Latest commit

 

History

History
115 lines (98 loc) · 3.54 KB

File metadata and controls

115 lines (98 loc) · 3.54 KB
endpoint bulk
lang go
es_version 9.3
client github.com/elastic/go-elasticsearch/v9

Elasticsearch 9.3 bulk endpoint (Go example)

Use esutil.BulkIndexer to index multiple documents concurrently. The indexer manages batching, flushing, and worker goroutines automatically.

type Product struct {
    Name     string  `json:"name"`
    Brand    string  `json:"brand"`
    Price    float64 `json:"price"`
    Category string  `json:"category"`
    InStock  bool    `json:"in_stock"`
    Rating   float64 `json:"rating"`
}

indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
    Index:  "products",
    Client: client,
})
if err != nil {
    log.Fatalf("Error creating indexer: %s", err)
}

products := []Product{
    {Name: "Espresso Machine Pro",       Brand: "BrewMaster",  Price: 899.99, Category: "appliances",  InStock: true,  Rating: 4.7},
    {Name: "Noise-Cancelling Headphones", Brand: "SoundCore",  Price: 249.00, Category: "electronics", InStock: true,  Rating: 4.5},
    {Name: "Ergonomic Standing Desk",     Brand: "DeskCraft",  Price: 599.00, Category: "furniture",   InStock: false, Rating: 4.8},
    {Name: "4K Webcam with Mic",          Brand: "StreamGear", Price: 129.99, Category: "electronics", InStock: true,  Rating: 4.3},
    {Name: "Cast Iron Dutch Oven",        Brand: "HearthStone", Price: 79.95, Category: "cookware",   InStock: true,  Rating: 4.9},
    {Name: "Mechanical Keyboard",         Brand: "TypeForce",  Price: 169.00, Category: "electronics", InStock: true,  Rating: 4.6},
    {Name: "Air Purifier HEPA-13",        Brand: "CleanAir",   Price: 349.00, Category: "appliances",  InStock: true,  Rating: 4.4},
    {Name: "Bamboo Cutting Board Set",    Brand: "HearthStone", Price: 34.99, Category: "cookware",   InStock: true,  Rating: 4.2},
}

for i, product := range products {
    data, err := json.Marshal(product)
    if err != nil {
        log.Fatalf("Error encoding product: %s", err)
    }

    indexer.Add(
        context.Background(),
        esutil.BulkIndexerItem{
            Action:     "index",
            DocumentID: fmt.Sprintf("prod-%d", i+1),
            Body:       bytes.NewReader(data),
        },
    )
}

indexer.Close(context.Background())

stats := indexer.Stats()
fmt.Printf("Indexed %d documents\n", stats.NumFlushed)

You must call Close to flush any remaining documents. The DocumentID field is optional — omit it to let Elasticsearch generate IDs automatically.

Handling errors

Attach OnFailure callbacks to each item to capture per-document errors, and check Stats() after closing for an overall summary:

indexer.Add(
    context.Background(),
    esutil.BulkIndexerItem{
        Action: "index",
        Body:   bytes.NewReader(data),
        OnFailure: func(
            ctx context.Context,
            item esutil.BulkIndexerItem,
            res esutil.BulkIndexerResponseItem,
            err error,
        ) {
            if err != nil {
                log.Printf("ERROR: %s", err)
            } else {
                log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
            }
        },
    },
)

// After Close:
stats := indexer.Stats()
if stats.NumFailed > 0 {
    log.Fatalf("%d documents failed to index", stats.NumFailed)
}

Large datasets

Adjust NumWorkers, FlushBytes, and FlushInterval to balance throughput against cluster pressure:

indexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
    Index:         "products",
    Client:        client,
    NumWorkers:    4,
    FlushBytes:    5e+6,
    FlushInterval: 30 * time.Second,
})