Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added conditions to the Microsoft.CSharp, System.Buffers & System.Diagnostics.DiagnosticSource dependencies so that they are not included on net 6+ as the newer framework's natively provides those dependencies. ([#930](https://github.com/opensearch-project/opensearch-net/pull/930))
- Added support for Hybrid query ([#917](https://github.com/opensearch-project/opensearch-net/pull/917))
- Added support for `MaxDistance` and `MinScore` to `KnnQuery` ([#917](https://github.com/opensearch-project/opensearch-net/pull/917))
- Added initial support for the bulk streaming API ([#935](https://github.com/opensearch-project/opensearch-net/pull/935))

### Removed
- Removed support for the `net461` target ([#256](https://github.com/opensearch-project/opensearch-net/pull/256))
Expand Down
605 changes: 605 additions & 0 deletions src/ApiGenerator/opensearch-openapi.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using OpenSearch.Net.Utf8Json;

namespace OpenSearch.Client
{
[JsonFormatter(typeof(BulkStreamRequestFormatter))]
[MapsApi("bulk_stream")]
public partial interface IBulkStreamRequest
{
[IgnoreDataMember]
BulkOperationsCollection<IBulkOperation> Operations { get; set; }
}

public partial class BulkStreamRequest
{
public BulkOperationsCollection<IBulkOperation> Operations { get; set; }
}

public partial class BulkStreamDescriptor
{
BulkOperationsCollection<IBulkOperation> IBulkStreamRequest.Operations { get; set; } = new BulkOperationsCollection<IBulkOperation>();

public BulkStreamDescriptor Create<T>(Func<BulkCreateDescriptor<T>, IBulkCreateOperation<T>> bulkCreateSelector)
where T : class =>
AddOperation(bulkCreateSelector?.Invoke(new BulkCreateDescriptor<T>()));

/// <summary>
/// CreateMany, convenience method to create many documents at once.
/// </summary>
/// <param name="objects">the objects to create</param>
/// <param name="bulkCreateSelector">A func called on each object to describe the individual create operation</param>
public BulkStreamDescriptor CreateMany<T>(IEnumerable<T> @objects, Func<BulkCreateDescriptor<T>, T, IBulkCreateOperation<T>> bulkCreateSelector = null)
where T : class =>
AddOperations(@objects, bulkCreateSelector, o => new BulkCreateDescriptor<T>().Document(o));

public BulkStreamDescriptor Index<T>(Func<BulkIndexDescriptor<T>, IBulkIndexOperation<T>> bulkIndexSelector)
where T : class =>
AddOperation(bulkIndexSelector?.Invoke(new BulkIndexDescriptor<T>()));

/// <summary>
/// IndexMany, convenience method to pass many objects at once.
/// </summary>
/// <param name="objects">the objects to index</param>
/// <param name="bulkIndexSelector">A func called on each object to describe the individual index operation</param>
public BulkStreamDescriptor IndexMany<T>(IEnumerable<T> @objects, Func<BulkIndexDescriptor<T>, T, IBulkIndexOperation<T>> bulkIndexSelector = null)
where T : class =>
AddOperations(@objects, bulkIndexSelector, o => new BulkIndexDescriptor<T>().Document(o));

/// <summary>
/// DeleteMany, convenience method to delete many objects at once.
/// </summary>
/// <param name="objects">the objects to delete</param>
/// <param name="bulkDeleteSelector">A func called on each object to describe the individual delete operation</param>
public BulkStreamDescriptor DeleteMany<T>(
IEnumerable<T> @objects,
Func<BulkDeleteDescriptor<T>, T, IBulkDeleteOperation<T>> bulkDeleteSelector = null
)
where T : class =>
AddOperations(@objects, bulkDeleteSelector, o => new BulkDeleteDescriptor<T>().Document(o));

/// <summary>
/// DeleteMany, convenience method to delete many objects at once.
/// </summary>
/// <param name="ids">Enumerable of string ids to delete</param>
/// <param name="bulkDeleteSelector">A func called on each ids to describe the individual delete operation</param>
public BulkStreamDescriptor DeleteMany<T>(
IEnumerable<string> ids,
Func<BulkDeleteDescriptor<T>, string, IBulkDeleteOperation<T>> bulkDeleteSelector = null
)
where T : class =>
AddOperations(ids, bulkDeleteSelector, id => new BulkDeleteDescriptor<T>().Id(id));

/// <summary>
/// DeleteMany, convenience method to delete many objects at once.
/// </summary>
/// <param name="ids">Enumerable of int ids to delete</param>
/// <param name="bulkDeleteSelector">A func called on each ids to describe the individual delete operation</param>
public BulkStreamDescriptor DeleteMany<T>(
IEnumerable<long> ids,
Func<BulkDeleteDescriptor<T>, long, IBulkDeleteOperation<T>> bulkDeleteSelector = null
)
where T : class =>
AddOperations(ids, bulkDeleteSelector, id => new BulkDeleteDescriptor<T>().Id(id));

public BulkStreamDescriptor Delete<T>(T obj, Func<BulkDeleteDescriptor<T>, IBulkDeleteOperation<T>> bulkDeleteSelector = null)
where T : class =>
AddOperation(bulkDeleteSelector.InvokeOrDefault(new BulkDeleteDescriptor<T>().Document(obj)));

public BulkStreamDescriptor Delete<T>(Func<BulkDeleteDescriptor<T>, IBulkDeleteOperation<T>> bulkDeleteSelector)
where T : class =>
AddOperation(bulkDeleteSelector?.Invoke(new BulkDeleteDescriptor<T>()));

/// <summary>
/// UpdateMany, convenience method to pass many objects at once to do multiple updates.
/// </summary>
/// <param name="objects">the objects to update</param>
/// <param name="bulkUpdateSelector">An func called on each object to describe the individual update operation</param>
public BulkStreamDescriptor UpdateMany<T>(
IEnumerable<T> @objects,
Func<BulkUpdateDescriptor<T, T>, T, IBulkUpdateOperation<T, T>> bulkUpdateSelector
)
where T : class =>
AddOperations(objects, bulkUpdateSelector, o => new BulkUpdateDescriptor<T, T>().IdFrom(o));

/// <summary>
/// UpdateMany, convenience method to pass many objects at once to do multiple updates.
/// </summary>
/// <param name="objects">the objects to update</param>
/// <param name="bulkUpdateSelector">An func called on each object to describe the individual update operation</param>
public BulkStreamDescriptor UpdateMany<T, TPartialDocument>(
IEnumerable<T> @objects,
Func<BulkUpdateDescriptor<T, TPartialDocument>, T, IBulkUpdateOperation<T, TPartialDocument>> bulkUpdateSelector
)
where T : class
where TPartialDocument : class =>
AddOperations(objects, bulkUpdateSelector, o => new BulkUpdateDescriptor<T, TPartialDocument>().IdFrom(o));

public BulkStreamDescriptor Update<T>(Func<BulkUpdateDescriptor<T, T>, IBulkUpdateOperation<T, T>> bulkUpdateSelector)
where T : class =>
Update<T, T>(bulkUpdateSelector);

public BulkStreamDescriptor Update<T, TPartialDocument>(
Func<BulkUpdateDescriptor<T, TPartialDocument>, IBulkUpdateOperation<T, TPartialDocument>> bulkUpdateSelector
)
where T : class
where TPartialDocument : class =>
AddOperation(bulkUpdateSelector?.Invoke(new BulkUpdateDescriptor<T, TPartialDocument>()));

public BulkStreamDescriptor AddOperation(IBulkOperation operation) => Assign(operation, (a, v) => a.Operations.AddIfNotNull(v));

private BulkStreamDescriptor AddOperations<T, TDescriptor, TInterface>(
IEnumerable<T> objects,
Func<TDescriptor, T, TInterface> bulkIndexSelector,
Func<T, TDescriptor> defaultSelector
)
where TInterface : class, IBulkOperation
where TDescriptor : class, TInterface
{
if (@objects == null) return this;

var objectsList = @objects.ToList();
var operations = new List<TInterface>(objectsList.Count());
foreach (var o in objectsList)
{
var op = bulkIndexSelector.InvokeOrDefault(defaultSelector(o), o);
if (op != null) operations.Add(op);
}
return Assign(operations, (a, v) => a.Operations.AddRange(v));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using System;
using OpenSearch.Net;
using OpenSearch.Net.Utf8Json;

namespace OpenSearch.Client
{
internal class BulkStreamRequestFormatter : IJsonFormatter<IBulkStreamRequest>
{
private const byte Newline = (byte)'\n';

private static SourceWriteFormatter<object> SourceWriter { get; } = new SourceWriteFormatter<object>();

public IBulkStreamRequest Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver) =>
throw new NotSupportedException();

public void Serialize(ref JsonWriter writer, IBulkStreamRequest value, IJsonFormatterResolver formatterResolver)
{
if (value?.Operations == null)
return;

var settings = formatterResolver.GetConnectionSettings();
var inferrer = settings.Inferrer;
var formatter = formatterResolver.GetFormatter<object>();

for (var index = 0; index < value.Operations.Count; index++)
{
var op = value.Operations[index];
op.Index ??= value.Index ?? op.ClrType;
if (op.Index.Equals(value.Index)) op.Index = null;
op.Id = op.GetIdForOperation(inferrer);
op.Routing = op.GetRoutingForOperation(inferrer);

writer.WriteBeginObject();
writer.WritePropertyName(op.Operation);

formatter.Serialize(ref writer, op, formatterResolver);
writer.WriteEndObject();
writer.WriteRaw(Newline);

var body = op.GetBody();
if (body == null)
continue;

if (op.Operation == "update" || body is ILazyDocument)
{
var requestResponseSerializer = settings.RequestResponseSerializer;
requestResponseSerializer.SerializeUsingWriter(ref writer, body, settings, SerializationFormatting.None);
}
else
SourceWriter.Serialize(ref writer, body, formatterResolver);
writer.WriteRaw(Newline);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
using OpenSearch.Net;

namespace OpenSearch.Client
{
[DataContract]
public class BulkStreamResponse : ResponseBase
{
[DataMember(Name ="errors")]
public bool Errors { get; internal set; }

public override bool IsValid => base.IsValid && !Errors && !ItemsWithErrors.HasAny();

[DataMember(Name ="items")]
public IReadOnlyList<BulkResponseItemBase> Items { get; internal set; } = EmptyReadOnly<BulkResponseItemBase>.List;

[IgnoreDataMember]
public IEnumerable<BulkResponseItemBase> ItemsWithErrors => !Items.HasAny()
? Enumerable.Empty<BulkResponseItemBase>()
: Items.Where(i => !i.IsValid);

[DataMember(Name ="took")]
public long Took { get; internal set; }

protected override void DebugIsValid(StringBuilder sb)
{
if (Items == null) return;

sb.AppendLine($"# Invalid Bulk items:");
foreach (var i in Items.Select((item, i) => new { item, i }).Where(i => !i.item.IsValid))
sb.AppendLine($" operation[{i.i}]: {i.item}");
}
}
}
4 changes: 4 additions & 0 deletions src/OpenSearch.Client/_Generated/ApiUrlsLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ internal static partial class ApiUrlsLookups
{
internal static readonly ApiUrls NoNamespaceBulk = new(["_bulk", "{index}/_bulk"]);

internal static readonly ApiUrls NoNamespaceBulkStream = new(
["_bulk/stream", "{index}/_bulk/stream"]
);

internal static readonly ApiUrls CatAliases = new(["_cat/aliases", "_cat/aliases/{name}"]);

internal static readonly ApiUrls CatAllPitSegments = new(["_cat/pit_segments/_all"]);
Expand Down
Loading
Loading