diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 4558d757f70..d2e3f4d23e8 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -58,7 +58,7 @@
-
+
diff --git a/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs b/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs
index 65a766560ed..3c4f7981a01 100644
--- a/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs
+++ b/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs
@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Threading;
@@ -49,9 +48,10 @@ public DuckDBConnectionPoolLifetime(
_repeated = repeated;
Shared = CreatePool(isReadOnly: false, log: true);
- using var connection = Shared.Open();
- foreach (var s in once)
- s.Execute(connection);
+ using (Shared.Rent(out var connection)) {
+ foreach (var s in once)
+ s.Execute(connection);
+ }
return;
@@ -105,6 +105,7 @@ public Task StopAsync(CancellationToken cancellationToken) {
protected override void Dispose(bool disposing) {
if (disposing) {
+ Shared.Dispose();
if (_tempPath != null) {
try {
File.Delete(_tempPath);
@@ -118,7 +119,6 @@ protected override void Dispose(bool disposing) {
}
private class ConnectionPoolWithFunctions(string connectionString, IReadOnlyList setup) : DuckDBConnectionPool(connectionString) {
- [Experimental("DuckDBNET001")]
protected override void Initialize(DuckDBAdvancedConnection connection) {
base.Initialize(connection);
for (var i = 0; i < setup.Count; i++) {
diff --git a/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs b/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs
index 7c2ad41b00d..0cb03f04c14 100644
--- a/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs
+++ b/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs
@@ -17,7 +17,6 @@ public static class InjectionExtensions {
public static IServiceCollection AddDuckDb(this IServiceCollection services) {
services.AddSingleton();
services.AddHostedService(sp => sp.GetRequiredService());
- services.AddDuckDBSetup();
services.AddSingleton(sp => sp.GetRequiredService().Shared);
services.AddSingleton();
return services;
diff --git a/src/KurrentDB.Core/DuckDB/InlineFunctions.cs b/src/KurrentDB.Core/DuckDB/InlineFunctions.cs
deleted file mode 100644
index 09250f2cac7..00000000000
--- a/src/KurrentDB.Core/DuckDB/InlineFunctions.cs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
-// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Linq;
-using DuckDB.NET.Data;
-using DuckDB.NET.Data.DataChunk.Reader;
-using DuckDB.NET.Data.DataChunk.Writer;
-using KurrentDB.Common.Utils;
-using KurrentDB.Core.Bus;
-using KurrentDB.Core.Services.Transport.Enumerators;
-using KurrentDB.Core.Services.UserManagement;
-using KurrentDB.DuckDB;
-using ResolvedEvent = KurrentDB.Core.Data.ResolvedEvent;
-
-namespace KurrentDB.Core.DuckDB;
-
-public class KdbGetEventSetup(IPublisher publisher) : IDuckDBSetup {
- [Experimental("DuckDBNET001")]
- public void Execute(DuckDBConnection connection) {
- connection.RegisterScalarFunction("kdb_get", GetEvent);
- }
-
- public bool OneTimeOnly => false;
-
- [Experimental("DuckDBNET001")]
- private void GetEvent(IReadOnlyList readers, IDuckDBDataWriter writer, ulong rowCount) {
- var positions = Enumerable.Range(0, (int)rowCount).Select(x => (long)readers[0].GetValue((ulong)x)).ToArray();
- var result = publisher.ReadEvents(positions).ToArray();
-
- for (ulong i = 0; i < (ulong)result.Length; i++) {
- var asString = AsDuckEvent(result[i]);
- writer.WriteValue(asString, i);
- }
- }
-
- private static string AsDuckEvent(string stream,
- string eventType,
- DateTime created,
- ReadOnlyMemory data,
- ReadOnlyMemory meta) {
- var dataString = data.IsValidUtf8Json() ? Helper.UTF8NoBom.GetString(data.Span) : "\"\"";
- var metaString = meta.Length == 0 ? "{}" : Helper.UTF8NoBom.GetString(meta.Span);
- return
- $"{{ \"data\": {dataString}, \"metadata\": {metaString}, \"stream_id\": \"{stream}\", \"created\": \"{created:u}\", \"event_type\": \"{eventType}\" }}";
- }
-
- private static string AsDuckEvent(ResolvedEvent evt)
- => AsDuckEvent(evt.Event.EventStreamId, evt.Event.EventType, evt.Event.TimeStamp, evt.Event.Data, evt.Event.Metadata);
-}
-
-file static class ReadEventsExtensions {
- public static IEnumerable ReadEvents(this IPublisher publisher, long[] logPositions) {
- using var enumerator = GetEnumerator();
-
- while (enumerator.MoveNext()) {
- if (enumerator.Current is ReadResponse.EventReceived eventReceived) {
- yield return eventReceived.Event;
- }
- }
-
- yield break;
-
- IEnumerator GetEnumerator() {
- return new Enumerator.ReadLogEventsSync(
- bus: publisher,
- logPositions: logPositions,
- user: SystemAccounts.System);
- }
- }
-}
diff --git a/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs b/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs
index e58a1cec854..b56efd92c35 100644
--- a/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs
+++ b/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs
@@ -170,8 +170,11 @@ public void Handle(ClientMessage.TransactionCommit message) {
public void Handle(SystemMessage.StateChangeMessage message) {
-
+ // this is now a compile error. probably change 'or' to 'and not', but needs to be thought through.
+ // TODO: think it through (ticket DB-1957)
+#pragma warning disable CS9336 // The pattern is redundant.
if (_nodeState == VNodeState.Leader && message.State is not VNodeState.Leader or VNodeState.ResigningLeader) {
+#pragma warning restore CS9336 // The pattern is redundant.
var keys = _currentRequests.Keys;
foreach (var key in keys) {
if (_currentRequests.Remove(key, out var manager)) {
diff --git a/src/KurrentDB.DuckDB/DuckDBSetup.cs b/src/KurrentDB.DuckDB/DuckDBSetup.cs
index d9abedf17b8..0afa2c0ba41 100644
--- a/src/KurrentDB.DuckDB/DuckDBSetup.cs
+++ b/src/KurrentDB.DuckDB/DuckDBSetup.cs
@@ -3,18 +3,19 @@
using DotNext.Threading;
using DuckDB.NET.Data;
+using Kurrent.Quack;
namespace KurrentDB.DuckDB;
public interface IDuckDBSetup {
- void Execute(DuckDBConnection connection);
+ void Execute(DuckDBAdvancedConnection connection);
bool OneTimeOnly { get; }
}
public abstract class DuckDBOneTimeSetup : IDuckDBSetup {
private Atomic.Boolean _created;
- public void Execute(DuckDBConnection connection) {
+ public void Execute(DuckDBAdvancedConnection connection) {
if (!_created.FalseToTrue()) {
return;
}
@@ -23,5 +24,5 @@ public void Execute(DuckDBConnection connection) {
public bool OneTimeOnly => true;
- protected abstract void ExecuteCore(DuckDBConnection connection);
+ protected abstract void ExecuteCore(DuckDBAdvancedConnection connection);
}
diff --git a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs
index b3fa422f3bf..4560451a807 100644
--- a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs
+++ b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs
@@ -3,6 +3,7 @@
using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.SecondaryIndexing.LoadTesting.Appenders;
using KurrentDB.SecondaryIndexing.Storage;
using KurrentDB.SecondaryIndexing.Tests.Generators;
@@ -24,8 +25,11 @@ public class RawQuackMessageBatchAppender : IMessageBatchAppender {
public RawQuackMessageBatchAppender(DuckDBConnectionPool db, DuckDbTestEnvironmentOptions options) {
_commitSize = options.CommitSize;
- var schema = new IndexingDbSchema();
- schema.CreateSchema(db);
+ var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty().GetEnumerator());
+
+ using (db.Rent(out var rentedConn)) {
+ schema.Execute(rentedConn);
+ }
using var connection = db.Open();
_defaultIndexAppender = new(connection, "idx_all"u8);
diff --git a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs
index 8c78a2daf6a..cadfe50c76c 100644
--- a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs
+++ b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs
@@ -3,8 +3,8 @@
using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Index.Hashes;
+using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.Core.Tests.Fakes;
-using KurrentDB.SecondaryIndexing.Indexes;
using KurrentDB.SecondaryIndexing.Indexes.Default;
using KurrentDB.SecondaryIndexing.LoadTesting.Appenders;
using KurrentDB.SecondaryIndexing.Storage;
@@ -23,13 +23,14 @@ public IndexMessageBatchAppender(DuckDBConnectionPool db, int commitSize) {
_commitSize = commitSize;
ReadIndexStub.Build();
var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe());
- var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitSize });
var publisher = new FakePublisher();
- var schema = new IndexingDbSchema();
- schema.CreateSchema(db);
+ var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty().GetEnumerator());
+ using (db.Rent(out var connection)) {
+ schema.Execute(connection);
+ }
- _processor = new(db, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance);
+ _processor = new(db, publisher, hasher, new("test"), NullLoggerFactory.Instance);
}
public ValueTask Append(TestMessageBatch batch) {
diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs b/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs
index aaa796cdf29..47ed2da0957 100644
--- a/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs
+++ b/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs
@@ -28,6 +28,6 @@ byte[] data
eventType
);
- return ResolvedEvent.ForUnresolvedEvent(record, 0);
+ return ResolvedEvent.ForUnresolvedEvent(record, 0L);
}
}
diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs b/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs
index 653844d5428..7a1e2c333ab 100644
--- a/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs
+++ b/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs
@@ -1,7 +1,11 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+using System.Security.Claims;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
+using KurrentDB.Core.DuckDB;
+using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.Core.XUnit.Tests;
using KurrentDB.SecondaryIndexing.Storage;
@@ -14,8 +18,19 @@ protected DuckDbIntegrationTest() {
var dbPath = Fixture.GetFilePathFor($"{GetType().Name}.db");
DuckDb = new($"Data Source={dbPath};");
- var schema = new IndexingDbSchema();
- schema.CreateSchema(DuckDb);
+ var schema = new IndexingDbSchema(GetEvents);
+ using (DuckDb.Rent(out var connection)) {
+ schema.Execute(connection);
+ }
+ }
+
+ private static IEnumerator GetEvents(long[] logPositions, ClaimsPrincipal user) {
+ // This is stub method for tests
+ for (var i = 0; i < logPositions.Length; i++) {
+ // See GetDatabaseEventsFunction implementation,
+ // for any unexpected response the function generates a row with empty values
+ yield return new ReadResponse.StreamNotFound(string.Empty);
+ }
}
public override async ValueTask DisposeAsync() {
diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs
index 192ab468ba8..913afd347cb 100644
--- a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs
+++ b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs
@@ -1,9 +1,8 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
-using Dapper;
using DotNext;
-using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack;
using KurrentDB.Core.Data;
using KurrentDB.Core.Index.Hashes;
using KurrentDB.Core.Tests.Fakes;
@@ -117,44 +116,88 @@ public void CommittedMultipleEventsToAStringWithoutCategory_AreIndexed() {
}
private void AssertDefaultIndexQueryReturns(List expected) {
- var records =
- DuckDb.QueryToList(new(-1, long.MaxValue,
- int.MaxValue));
+ List records;
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ records = connection
+ .ExecuteQuery(new(-1, int.MaxValue))
+ .ToList();
+ }
+ }
Assert.Equal(expected, records.Select(x => x.LogPosition));
}
private void AssertLastLogPositionQueryReturns(long? expectedLogPosition) {
- var actual = DuckDb.QueryFirstOrDefault().OrNull();
+ LastPositionResult? actual;
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ actual = connection.QueryFirstOrDefault().OrNull();
+ }
+ }
Assert.Equal(expectedLogPosition, actual?.PreparePosition);
}
private void AssertGetCategoriesQueryReturns(string[] expected) {
- using var connection = DuckDb.Open();
- var records = connection.Query("select distinct category from idx_all order by log_position");
+ var records = new List();
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ using var result = connection.ExecuteAdHocQuery("select distinct category from idx_all_snapshot order by log_position"u8);
+ while (result.TryFetch(out var chunk)) {
+ using (chunk) {
+ while (chunk.TryRead(out var row)) {
+ records.Add(row.ReadString());
+ }
+ }
+ }
+ }
+ }
Assert.Equal(expected, records);
}
private void AssertCategoryIndexQueryReturns(string category, List expected) {
- var records =
- DuckDb.QueryToList(new(category, 0, long.MaxValue, 32));
+ List records;
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ records = connection
+ .ExecuteQuery(new(category, 0, 32))
+ .ToList();
+ }
+ }
Assert.Equal(expected, records.Select(x => x.LogPosition));
}
private void AssertGetAllEventTypesQueryReturns(string[] expected) {
- using var connection = DuckDb.Open();
- var records = connection.Query("select distinct event_type from idx_all order by log_position");
+ var records = new List();
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ using var result = connection.ExecuteAdHocQuery("select distinct event_type from idx_all order by log_position"u8);
+ while (result.TryFetch(out var chunk)) {
+ using (chunk) {
+ while (chunk.TryRead(out var row)) {
+ records.Add(row.ReadString());
+ }
+ }
+ }
+ }
+ }
Assert.Equal(expected, records);
}
private void AssertReadEventTypeIndexQueryReturns(string eventType, List expected) {
- var records = DuckDb.QueryToList(
- new(eventType, 0, long.MaxValue, 32)
- );
+ List records;
+ using (DuckDb.Rent(out var connection)) {
+ using (_processor.CaptureSnapshot(connection)) {
+ records = connection.ExecuteQuery(
+ new(eventType, 0, 32)
+ )
+ .ToList();
+ }
+ }
Assert.Equal(expected, records.Select(x => x.LogPosition));
}
@@ -164,13 +207,11 @@ private void AssertReadEventTypeIndexQueryReturns(string eventType, List e
public DefaultIndexProcessorTests() {
ReadIndexStub.Build();
- const int commitBatchSize = 9;
var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe());
- var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitBatchSize });
var publisher = new FakePublisher();
- _processor = new(DuckDb, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance);
+ _processor = new(DuckDb, publisher, hasher, new("test"), NullLoggerFactory.Instance);
}
public override ValueTask DisposeAsync() {
diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs
index 494337ca64a..52737ff2b5e 100644
--- a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs
+++ b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs
@@ -19,14 +19,12 @@ public abstract class IndexTestBase : DuckDbIntegrationTest {
private readonly ReadIndexStub _readIndexStub = new();
protected IndexTestBase() {
- const int commitBatchSize = 9;
var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe());
- var inFlightRecords = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitBatchSize });
var publisher = new FakePublisher();
- _processor = new(DuckDb, inFlightRecords, publisher, hasher, new("test"), NullLoggerFactory.Instance);
+ _processor = new(DuckDb, publisher, hasher, new("test"), NullLoggerFactory.Instance);
- Sut = new(DuckDb, _processor, inFlightRecords, _readIndexStub.ReadIndex);
+ Sut = new(DuckDb, _processor, _readIndexStub.ReadIndex);
}
protected void IndexEvents(ResolvedEvent[] events, bool shouldCommit) {
diff --git a/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs b/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs
index dc82b5ab67b..025c0fe32b6 100644
--- a/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs
+++ b/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs
@@ -10,6 +10,7 @@
using KurrentDB.SecondaryIndexing.Indexes.Category;
using KurrentDB.SecondaryIndexing.Indexes.Default;
using KurrentDB.SecondaryIndexing.Indexes.EventType;
+using KurrentDB.SecondaryIndexing.Query;
using KurrentDB.SecondaryIndexing.Tests.Generators;
using KurrentDB.Surge.Testing;
using Microsoft.Extensions.DependencyInjection;
@@ -27,6 +28,15 @@ public async Task ReadsAllEventsFromDefaultIndex(bool forwards) {
await ValidateRead(SystemStreams.DefaultSecondaryIndex, Fixture.AppendedBatches.ToDefaultIndexResolvedEvents(), forwards);
}
+ [Fact]
+ public async Task ReadFromDefaultIndexUsingQueryEngine() {
+ var engine = Fixture.NodeServices.GetRequiredService();
+ using var preparedSql = engine.PrepareQuery("SELECT metadata FROM kdb.records"u8, digitallySign: true);
+
+ var consumer = new RowCountReader();
+ await engine.ExecuteAsync(preparedSql.Memory, consumer, checkIntegrity: true, TestContext.Current.CancellationToken);
+ }
+
[Theory]
[InlineData(true)]
[InlineData(false)]
@@ -57,7 +67,6 @@ public async Task ReadFromUnknownIndexFails() {
public enum CommitMode {
None,
CommitAndClear,
- CommitAndKeep,
}
private static readonly IEventFilter UserEventsFilter = new EventFilter.DefaultAllFilterStrategy.NonSystemStreamStrategy();
@@ -65,7 +74,6 @@ public enum CommitMode {
[Theory]
[InlineData(CommitMode.None)]
[InlineData(CommitMode.CommitAndClear)]
- [InlineData(CommitMode.CommitAndKeep)]
public async Task ReadFromBothDefault(CommitMode mode) {
await ValidateRead(mode, SystemStreams.DefaultSecondaryIndex, UserEventsFilter);
}
@@ -73,7 +81,6 @@ public async Task ReadFromBothDefault(CommitMode mode) {
[Theory]
[InlineData(CommitMode.None)]
[InlineData(CommitMode.CommitAndClear)]
- [InlineData(CommitMode.CommitAndKeep)]
public async Task ReadFromBothCategory(CommitMode mode) {
var category = Fixture.Categories.First();
await ValidateRead(mode, CategoryIndex.Name(category), new CategoryFilter(category));
@@ -82,7 +89,6 @@ public async Task ReadFromBothCategory(CommitMode mode) {
[Theory]
[InlineData(CommitMode.None)]
[InlineData(CommitMode.CommitAndClear)]
- [InlineData(CommitMode.CommitAndKeep)]
public async Task ReadFromBothEventType(CommitMode mode) {
var eventType = Fixture.EventTypes.First();
await ValidateRead(mode, EventTypeIndex.Name(eventType), new EventTypeFilter(eventType));
@@ -112,10 +118,7 @@ private async Task ValidateRead(CommitMode mode, string indexName, IEventFilter
var processor = Fixture.NodeServices.GetRequiredService();
switch (mode) {
case CommitMode.CommitAndClear:
- processor.Commit(true);
- break;
- case CommitMode.CommitAndKeep:
- processor.Commit(false);
+ processor.Commit();
break;
}
@@ -141,4 +144,30 @@ private class CategoryFilter(string category) : IEventFilter {
private class EventTypeFilter(string eventType) : IEventFilter {
public bool IsEventAllowed(EventRecord eventRecord) => eventRecord.EventType == eventType;
}
+
+ private sealed class RowCountReader : IQueryResultConsumer {
+ public long RowCount {
+ get;
+ private set;
+ }
+
+ public ValueTask ConsumeAsync(IQueryResultReader resultReader, CancellationToken token) {
+ var task = ValueTask.CompletedTask;
+ try {
+ while (resultReader.TryRead()) {
+ RowCount += resultReader.Chunk.RowCount;
+ }
+ } catch (OperationCanceledException e) when (e.CancellationToken == token) {
+ task = ValueTask.FromCanceled(token);
+ } catch (Exception e) {
+ task = ValueTask.FromException(e);
+ }
+
+ return task;
+ }
+
+ public void Bind(TBinder binder) where TBinder : IPreparedQueryBinder, allows ref struct {
+ // nothing to bind, query has no substitution
+ }
+ }
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs
index 32767e0cc1b..4c163873e59 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Data;
using KurrentDB.Core.Services.Storage.ReaderIndex;
@@ -15,29 +16,58 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Category;
internal class CategoryIndexReader(
DuckDBConnectionPool sharedPool,
DefaultIndexProcessor processor,
- IReadIndex index,
- DefaultIndexInFlightRecords inFlightRecords)
+ IReadIndex index)
: SecondaryIndexReaderBase(sharedPool, index) {
protected override string GetId(string indexName) =>
CategoryIndex.TryParseCategoryName(indexName, out var categoryName)
? categoryName
: string.Empty;
- protected override (List, bool) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, r => r.Category == id);
+ protected override List GetDbRecordsForwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection.ExecuteQuery(new(id!, startPosition,
+ maxCount))
+ .CopyTo(records);
+ } else {
+ connection.ExecuteQuery(new(id!, startPosition,
+ maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(id!, startPosition, endPosition, maxCount))
- : db.QueryToList(new(id!, startPosition, endPosition, maxCount));
+ return records;
+ }
- protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, r => r.Category == id);
+ protected override List GetDbRecordsBackwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection.ExecuteQuery(new(id!,
+ startPosition, maxCount))
+ .CopyTo(records);
+ } else {
+ connection.ExecuteQuery(new(id!,
+ startPosition, maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(id!, startPosition, 0, maxCount))
- : db.QueryToList(new(id!, startPosition, 0, maxCount));
+ return records;
+ }
public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition;
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs
index 560b9a44216..108d7c9be58 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs
@@ -15,12 +15,11 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar
=> new(statement) {
args.Category,
args.StartPosition,
- args.EndPosition,
args.Count
};
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where category=$1 and log_position>$2 and log_position<$3 order by rowid limit $4"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position>$2 order by coalesce(commit_position, log_position) limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -33,12 +32,11 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar
=> new(statement) {
args.Category,
args.StartPosition,
- args.EndPosition,
args.Count
};
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where category=$1 and log_position>=$2 and log_position<$3 order by rowid limit $4"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position>=$2 order by coalesce(commit_position, log_position) limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -55,7 +53,7 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar
};
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where category=$1 and log_position<$2 order by rowid desc limit $3"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position<$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -72,10 +70,10 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar
};
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where category=$1 and log_position<=$2 order by rowid desc limit $3"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position<=$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
- public record struct CategoryIndexQueryArgs(string Category, long StartPosition, long EndPosition, int Count);
+ public record struct CategoryIndexQueryArgs(string Category, long StartPosition, int Count);
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs
deleted file mode 100644
index bb543918993..00000000000
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
-// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
-
-using KurrentDB.SecondaryIndexing.Storage;
-
-namespace KurrentDB.SecondaryIndexing.Indexes.Default;
-
-internal record struct InFlightRecord(
- long LogPosition,
- long CommitPosition,
- string Category,
- string EventType,
- string StreamName,
- long EventNumber,
- long Created
-);
-
-internal class DefaultIndexInFlightRecords(SecondaryIndexingPluginOptions options) {
- private readonly InFlightRecord[] _records = new InFlightRecord[options.CommitBatchSize];
-
- private uint _version; // used for optimistic lock
- private int _count;
-
- public int Count => _count;
-
- public void Append(long logPosition, long commitPosition, string category, string eventType, string stream, long eventNumber, long created) {
- var count = _count;
- _records[count] = new(
- LogPosition: logPosition,
- CommitPosition: commitPosition,
- Category: category,
- EventType: eventType,
- StreamName: stream,
- EventNumber: eventNumber,
- Created: created);
-
- // Fence: make sure that the array modification cannot be done after the increment
- Volatile.Write(ref _count, count + 1);
- }
-
- public void Clear() {
- Interlocked.Increment(ref _version); // full fence
-
- // Fence: make sure that the count is modified after the version
- _count = 0;
- }
-
- // read is protected by optimistic lock
- private bool TryRead(uint currentVer, int index, out InFlightRecord record) {
- record = _records[index];
-
- // ensure that the record is copied before the comparison
- Interlocked.MemoryBarrier();
- return currentVer == _version;
- }
-
- public (List, bool) GetInFlightRecordsForwards(
- long startPosition,
- int maxCount,
- bool excludeFirst,
- Func? query = null) {
- query ??= True; // to avoid branching in the loop
-
- var isComplete = false;
- bool first = true;
-
- var currentVer = _version;
- var result = new List();
- for (int i = 0, count = Volatile.Read(in _count), remaining = maxCount;
- i < count && remaining > 0 && TryRead(currentVer, i, out var current);
- i++) {
- if (current.LogPosition >= startPosition) {
- if (i == 0 && current.LogPosition == startPosition) {
- isComplete = true;
- }
- if (query(current)) {
- if (first && excludeFirst && current.LogPosition == startPosition) {
- first = false;
- continue;
- }
-
- remaining--;
- result.Add(new(current.LogPosition, current.CommitPosition, current.EventNumber));
- }
- } else {
- if (i == 0) {
- isComplete = true;
- }
- }
- }
-
- return (result, isComplete);
- }
-
- public IEnumerable GetInFlightRecordsBackwards(
- long startPosition,
- int maxCount,
- bool excludeFirst,
- Func? query = null) {
- query ??= True; // to avoid branching in the loop
-
- var count = Volatile.Read(in _count);
- var currentVer = _version;
- bool first = true;
-
- if (count > 0
- && TryRead(currentVer, 0, out var current)
- && current.LogPosition <= startPosition) {
- for (int i = count - 1, remaining = maxCount;
- i >= 0 && remaining > 0 && TryRead(currentVer, i, out current);
- i--) {
- if (current.LogPosition <= startPosition && query(current)) {
- if (first && excludeFirst && current.LogPosition == startPosition) {
- first = false;
- continue;
- }
-
- remaining--;
- yield return new(current.LogPosition, current.CommitPosition, current.EventNumber);
- }
- }
- }
- }
-
- private static bool True(InFlightRecord record) => true;
-
- public IEnumerable GetInFlightRecords() {
- var currentVer = _version;
- for (int i = 0, count = Volatile.Read(in _count);
- i < count && TryRead(currentVer, i, out var current);
- i++) {
- yield return current;
- }
- }
-}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs
index fdbfbe9e94b..afc3c616462 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs
@@ -4,9 +4,11 @@
using System.Diagnostics.Metrics;
using DotNext;
using DotNext.Threading;
+using DuckDB.NET.Data;
using Google.Protobuf.WellKnownTypes;
using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
using KurrentDB.Common.Configuration;
using KurrentDB.Core.Bus;
using KurrentDB.Core.Data;
@@ -17,6 +19,7 @@
using KurrentDB.SecondaryIndexing.Diagnostics;
using KurrentDB.SecondaryIndexing.Indexes.Category;
using KurrentDB.SecondaryIndexing.Indexes.EventType;
+using KurrentDB.SecondaryIndexing.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using static KurrentDB.SecondaryIndexing.Indexes.Default.DefaultSql;
@@ -24,19 +27,20 @@
namespace KurrentDB.SecondaryIndexing.Indexes.Default;
internal class DefaultIndexProcessor : Disposable, ISecondaryIndexProcessor {
- private readonly DefaultIndexInFlightRecords _inFlightRecords;
private readonly DuckDBAdvancedConnection _connection;
private readonly IPublisher _publisher;
private readonly ILongHasher _hasher;
private readonly ILogger _log;
+ private readonly BufferedView _appender;
+ private Atomic _lastPosition;
- private Appender _appender;
-
- public TFPos LastIndexedPosition { get; private set; }
+ public TFPos LastIndexedPosition {
+ get => _lastPosition.Value;
+ private set => _lastPosition.Value = value;
+ }
public DefaultIndexProcessor(
DuckDBConnectionPool db,
- DefaultIndexInFlightRecords inFlightRecords,
IPublisher publisher,
ILongHasher hasher,
[FromKeyedServices(SecondaryIndexingConstants.InjectionKey)]
@@ -47,8 +51,7 @@ public DefaultIndexProcessor(
) {
_connection = db.Open();
_log = loggerFactory.CreateLogger();
- _appender = new(_connection, "idx_all"u8);
- _inFlightRecords = inFlightRecords;
+ _appender = new(_connection, "idx_all", "log_position", DefaultIndexViewName);
var serviceName = metricsConfiguration?.ServiceName ?? "kurrentdb";
Tracker = new("default", serviceName, meter, clock ?? TimeProvider.System,
loggerFactory.CreateLogger());
@@ -89,7 +92,7 @@ public bool TryIndex(ResolvedEvent resolvedEvent) {
var created = new DateTimeOffset(resolvedEvent.Event.TimeStamp).ToUnixTimeMilliseconds();
using (var row = _appender.CreateRow()) {
row.Add(logPosition);
- if (commitPosition.HasValue && logPosition != commitPosition)
+ if (commitPosition.HasValue && logPosition != commitPosition.GetValueOrDefault())
row.Add(commitPosition.Value);
else
row.Add(DBNull.Value);
@@ -110,8 +113,6 @@ public bool TryIndex(ResolvedEvent resolvedEvent) {
row.Add(schemaFormat);
}
- _inFlightRecords.Append(logPosition, commitPosition ?? logPosition, category, schemaName, resolvedEvent.Event.EventStreamId,
- eventNumber, created);
LastIndexedPosition = resolvedEvent.EventPosition!.Value;
_publisher.Publish(new StorageMessage.SecondaryIndexCommitted(SystemStreams.DefaultSecondaryIndex, resolvedEvent));
@@ -140,13 +141,10 @@ static string GetStreamCategory(string streamName) {
private Atomic.Boolean _committing;
- public void Commit() => Commit(true);
-
///
/// Commits all in-flight records to the index.
///
- /// Tells you whether to clear the in-flight records after committing. It must be true and only set to false in tests.
- internal void Commit(bool clearInflight) {
+ public void Commit() {
if (IsDisposingOrDisposed || !_committing.FalseToTrue())
return;
@@ -154,21 +152,20 @@ internal void Commit(bool clearInflight) {
using var duration = Tracker.StartCommitDuration();
_appender.Flush();
} catch (Exception e) {
- _log.LogError(e, "Failed to commit {Count} records to index at log position {LogPosition}",
- _inFlightRecords.Count, LastIndexedPosition);
+ _log.LogError(e, "Failed to commit records to index at log position {LogPosition}", LastIndexedPosition);
throw;
} finally {
_committing.TrueToFalse();
}
-
- if (clearInflight) {
- _inFlightRecords.Clear();
- }
}
+ public BufferedView.Snapshot CaptureSnapshot(DuckDBConnection connection)
+ => _appender.TakeSnapshot(connection, ExpandRecordFunction.UnnestExpression);
+
protected override void Dispose(bool disposing) {
if (disposing) {
Commit();
+ _appender.Unregister(_connection);
_appender.Dispose();
_connection.Dispose();
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs
index 2e7fca3d6d9..4751c589027 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Data;
using KurrentDB.Core.Services;
@@ -13,26 +14,57 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Default;
internal class DefaultIndexReader(
DuckDBConnectionPool sharedPool,
DefaultIndexProcessor processor,
- DefaultIndexInFlightRecords inFlightRecords,
IReadIndex index
) : SecondaryIndexReaderBase(sharedPool, index) {
protected override string GetId(string indexName) => string.Empty;
- protected override (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst);
+ protected override List GetDbRecordsForwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection.ExecuteQuery(new(
+ startPosition,
+ maxCount))
+ .CopyTo(records);
+ } else {
+ connection.ExecuteQuery(new(
+ startPosition,
+ maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(startPosition, endPosition, maxCount))
- : db.QueryToList(new(startPosition, endPosition, maxCount));
+ return records;
+ }
- protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst);
+ protected override List GetDbRecordsBackwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection.ExecuteQuery(
+ new(startPosition, maxCount))
+ .CopyTo(records);
+ } else {
+ connection.ExecuteQuery(
+ new(startPosition, maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(startPosition, 0, maxCount))
- : db.QueryToList(new(startPosition, 0, maxCount));
+ return records;
+ }
public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition;
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs
index 87b1fd5c0b6..96033fc4b27 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs
@@ -7,17 +7,24 @@
namespace KurrentDB.SecondaryIndexing.Indexes.Default;
internal static class DefaultSql {
- public record struct ReadDefaultIndexQueryArgs(long StartPosition, long EndPosition, int Count);
+ // We should query on this view rather than on idx_all table.
+ // The view combines in-flight cache and idx_all rows.
+ public const string DefaultIndexViewName = "idx_all_snapshot";
+
+ public record struct ReadDefaultIndexQueryArgs(long StartPosition, int Count);
///
/// Get index records for the default index with a log position greater than the start position
///
public struct ReadDefaultIndexQueryExcl : IQuery {
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
- => new(statement) { args.StartPosition, args.EndPosition, args.Count };
+ => new(statement) { args.StartPosition, args.Count };
+ // We cannot sort by 'rowid' because this hidden column exists only for rows in the physical table.
+ // Moreover, DuckDB cannot incrementally return query result by pages (data chunks), because ordering must be applied
+ // first on all rows, which leads to full materialization of the query result in the memory.
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where log_position>$1 and log_position<$2 and is_deleted=false order by rowid limit $3"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where log_position>$1 and is_deleted=false order by coalesce(commit_position, log_position) limit $2"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -27,10 +34,10 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre
///
public struct ReadDefaultIndexQueryIncl : IQuery {
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
- => new(statement) { args.StartPosition, args.EndPosition, args.Count };
+ => new(statement) { args.StartPosition, args.Count };
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where log_position>=$1 and log_position<$2 and is_deleted=false order by rowid limit $3"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where log_position>=$1 and is_deleted=false order by coalesce(commit_position, log_position) limit $2"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -43,7 +50,7 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre
=> new(statement) { args.StartPosition, args.Count };
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where log_position<$1 and is_deleted=false order by rowid desc limit $2"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where log_position<$1 and is_deleted=false order by coalesce(commit_position, log_position) desc, log_position desc limit $2"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -56,7 +63,7 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre
=> new(statement) { args.StartPosition, args.Count };
public static ReadOnlySpan CommandText =>
- "select log_position, commit_position, event_number from idx_all where log_position<=$1 and is_deleted=false order by rowid desc limit $2"u8;
+ "select log_position, commit_position, event_number from idx_all_snapshot where log_position<=$1 and is_deleted=false order by coalesce(commit_position, log_position) desc, log_position desc limit $2"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs
new file mode 100644
index 00000000000..8e5bddf69da
--- /dev/null
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs
@@ -0,0 +1,68 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System.Security.Claims;
+using DuckDB.NET.Native;
+using Kurrent.Quack;
+using Kurrent.Quack.Functions;
+using KurrentDB.Core.Data;
+using KurrentDB.Core.Services.Transport.Enumerators;
+
+namespace KurrentDB.SecondaryIndexing.Indexes.Default;
+
+internal sealed class ExpandRecordFunction(Func> eventsProvider)
+ : GetDatabaseEventsFunction(Name, eventsProvider) {
+
+ private new const string Name = "get_kdb_def";
+ public static ReadOnlySpan UnnestExpression => "unnest(get_kdb_def(log_position))"u8;
+
+ protected override void Bind(BindingContext context) {
+ // nothing to initialize here
+ }
+
+ protected override void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex) {
+ // Data column
+ var column = builder[0];
+ if (ev.IsJson) {
+ // JSON can be copied to DuckDB directly because it's encoded as UTF-8
+ column.SetValue(rowIndex, ev.Data.Span);
+ } else {
+ WriteBase64(column, rowIndex, ev.Data.Span);
+ }
+
+ // Metadata column
+ column = builder[1];
+ column.SetValue(rowIndex, ev.Metadata.Span is { Length: > 0 } metadata ? metadata : "{}"u8);
+
+ // StreamId column
+ column = builder[2];
+ column.SetValue(rowIndex, ev.EventStreamId);
+ }
+
+ protected override void FillRowWithEmptyData(ref TBuilder builder, int rowIndex) {
+ // Data column
+ var column = builder[0];
+ column.SetValue(rowIndex, "{}"u8);
+
+ // Metadata column
+ column = builder[1];
+ column.SetValue(rowIndex, "{}"u8);
+
+ // StreamId column
+ column = builder[2];
+ column.SetValue(rowIndex, string.Empty);
+ }
+}
+
+
+internal readonly ref struct EventColumns : ICompositeReturnType {
+ private const DuckDBType Data = DuckDBType.Varchar;
+ private const DuckDBType Metadata = DuckDBType.Varchar;
+ private const DuckDBType StreamId = DuckDBType.Varchar;
+
+ static IReadOnlyList> ICompositeReturnType.ReturnType => new ICompositeReturnType.Builder {
+ { Data, "data" },
+ { Metadata, "metadata" },
+ { StreamId, "stream_id" },
+ };
+}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs
index c10d130ed5e..0dc58df9bda 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs
@@ -1,6 +1,7 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Data;
using KurrentDB.Core.Services.Storage.ReaderIndex;
@@ -13,27 +14,58 @@ namespace KurrentDB.SecondaryIndexing.Indexes.EventType;
internal class EventTypeIndexReader(
DuckDBConnectionPool sharedPool,
DefaultIndexProcessor processor,
- IReadIndex index,
- DefaultIndexInFlightRecords inFlightRecords)
+ IReadIndex index)
: SecondaryIndexReaderBase(sharedPool, index) {
protected override string GetId(string streamName) =>
EventTypeIndex.TryParseEventType(streamName, out var eventTypeName) ? eventTypeName : string.Empty;
- protected override (List, bool) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, r => r.EventType == id);
+ protected override List GetDbRecordsForwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection
+ .ExecuteQuery(new(id!, startPosition, maxCount))
+ .CopyTo(records);
+ } else {
+ connection
+ .ExecuteQuery(new(id!, startPosition,
+ maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(id!, startPosition, endPosition, maxCount))
- : db.QueryToList(new(id!, startPosition, endPosition, maxCount));
+ return records;
+ }
- protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst)
- => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, r => r.EventType == id);
+ protected override List GetDbRecordsBackwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ if (excludeFirst) {
+ connection
+ .ExecuteQuery(new(id!, startPosition, maxCount))
+ .CopyTo(records);
+ } else {
+ connection
+ .ExecuteQuery(new(id!,
+ startPosition, maxCount))
+ .CopyTo(records);
+ }
+ }
+ }
- protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst)
- => excludeFirst
- ? db.QueryToList(new(id!, startPosition, 0, maxCount))
- : db.QueryToList(new(id!, startPosition, 0, maxCount));
+ return records;
+ }
public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition;
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs
index 0740c70bc69..fa5df1f722b 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs
@@ -7,7 +7,7 @@
namespace KurrentDB.SecondaryIndexing.Indexes.EventType;
internal static class EventTypeSql {
- public record struct ReadEventTypeIndexQueryArgs(string EventType, long StartPosition, long EndPosition, int Count);
+ public record struct ReadEventTypeIndexQueryArgs(string EventType, long StartPosition, int Count);
///
/// Get index records for a given event type where the log position is greater than the start position
@@ -17,12 +17,11 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P
=> new(statement) {
args.EventType,
args.StartPosition,
- args.EndPosition,
args.Count
};
public static ReadOnlySpan CommandText
- => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position>$2 and log_position<$3 order by rowid limit $4"u8;
+ => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position>$2 order by coalesce(commit_position, log_position) limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -35,12 +34,11 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P
=> new(statement) {
args.EventType,
args.StartPosition,
- args.EndPosition,
args.Count
};
public static ReadOnlySpan CommandText
- => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position>=$2 and log_position<$3 order by rowid limit $4"u8;
+ => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position>=$2 order by coalesce(commit_position, log_position) limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -57,7 +55,7 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P
};
public static ReadOnlySpan CommandText
- => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position<$2 order by rowid desc limit $3"u8;
+ => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position<$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
@@ -74,7 +72,7 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P
};
public static ReadOnlySpan CommandText
- => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position<=$2 order by rowid limit $3"u8;
+ => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position<=$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8;
public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64());
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs
new file mode 100644
index 00000000000..072f6be619c
--- /dev/null
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs
@@ -0,0 +1,59 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System.Runtime.CompilerServices;
+using System.Security.Claims;
+using DotNext.Buffers;
+using DotNext.Buffers.Text;
+using DuckDB.NET.Native;
+using Kurrent.Quack;
+using Kurrent.Quack.Functions;
+using KurrentDB.Core.Data;
+using KurrentDB.Core.Services.Transport.Enumerators;
+using KurrentDB.Core.Services.UserManagement;
+
+namespace KurrentDB.SecondaryIndexing.Indexes;
+
+internal abstract class GetDatabaseEventsFunction(string functionName, Func> eventsProvider) : ScalarFunction(functionName)
+ where TReturnType : ICompositeReturnType, allows ref struct
+{
+ // Accepts log_position
+ protected sealed override IReadOnlyList Parameters => [new(DuckDBType.BigInt)];
+
+ protected sealed override void Execute(ExecutionContext context, in DataChunk input, ref TBuilder builder) {
+ var logPositions = input[0].Int64Rows.ToArray(); // TODO: Remove array allocation
+
+ using var enumerator = eventsProvider.Invoke(logPositions, SystemAccounts.System);
+
+ for (var rowIndex = 0; enumerator.MoveNext(); rowIndex++) {
+ if (enumerator.Current is ReadResponse.EventReceived eventReceived) {
+ FillRow(eventReceived.Event.Event, ref builder, rowIndex);
+ } else {
+ // We should not leave the builder with uninitialized rows to avoid memory garbage to leak into DuckDB internals
+ FillRowWithEmptyData(ref builder, rowIndex);
+ }
+ }
+ }
+
+ protected abstract void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex)
+ where TBuilder : struct, DataChunk.IBuilder;
+
+ protected abstract void FillRowWithEmptyData(ref TBuilder builder, int rowIndex)
+ where TBuilder : struct, DataChunk.IBuilder;
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ protected static void WriteBase64(DataChunk.ColumnBuilder column, int rowIndex, ReadOnlySpan data) {
+ const byte quote = (byte)'"';
+
+ var writer = new BufferWriterSlim(4096);
+ writer.Add(quote);
+ var encoder = new Base64Encoder();
+ try {
+ encoder.EncodeToUtf8(data, ref writer, flush: true);
+ writer.Add(quote);
+ column.SetValue(rowIndex, writer.WrittenSpan);
+ } finally {
+ writer.Dispose();
+ }
+ }
+}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs b/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs
index 67305971be6..0a92762af00 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs
@@ -13,11 +13,7 @@ namespace KurrentDB.SecondaryIndexing.Indexes;
public abstract class SecondaryIndexReaderBase(DuckDBConnectionPool sharedPool, IReadIndex index) : ISecondaryIndexReader {
protected abstract string? GetId(string indexName);
- protected abstract (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst);
-
- protected abstract List GetDbRecordsForwards(DuckDBConnectionPool pool, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst);
-
- protected abstract IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst);
+ protected abstract List GetDbRecordsForwards(DuckDBConnectionPool pool, string? id, long startPosition, int maxCount, bool excludeFirst);
protected abstract List GetDbRecordsBackwards(DuckDBConnectionPool pool, string? id, long startPosition, int maxCount, bool excludeFirst);
@@ -60,28 +56,14 @@ CancellationToken token
ReadIndexEventsForwardCompleted NoData(ReadIndexResult result, bool endOfStream, string? error = null)
=> new(result, ResolvedEvent.EmptyArray, pos, lastIndexedPosition, endOfStream, error);
- IReadOnlyList GetIndexRecordsForwards(long startPosition) {
- var maxCount = msg.MaxCount;
- var (inFlight, isComplete) = GetInflightForwards(id, startPosition, maxCount, msg.ExcludeStart);
- if (isComplete) {
- return inFlight;
- }
-
- var end = inFlight.Count > 0 ? inFlight[0].LogPosition : lastIndexedPosition + 1;
- var range = GetDbRecordsForwards(GetPool(msg.Pool), id, startPosition, end, maxCount, msg.ExcludeStart);
- if (range.Count == 0) {
- return inFlight;
- }
-
- if (inFlight.Count > 0) {
- range.AddRange(inFlight);
- }
-
- return range;
- }
-
async ValueTask<(long, IReadOnlyList)> GetEventsForwards(long startPosition) {
- var indexPrepares = GetIndexRecordsForwards(startPosition);
+ var indexPrepares = GetDbRecordsForwards(
+ GetPool(msg.Pool),
+ id,
+ startPosition,
+ msg.MaxCount,
+ msg.ExcludeStart);
+
var events = await reader.ReadRecords(indexPrepares, true, token);
return (indexPrepares.Count, events);
}
@@ -118,45 +100,19 @@ CancellationToken token
ReadIndexEventsBackwardCompleted NoData(ReadIndexResult result, string? error = null)
=> new(result, ResolvedEvent.EmptyArray, pos, lastIndexedPosition, false, error);
- IReadOnlyList GetIndexRecordsBackwards(TFPos startPosition) {
- var maxCount = msg.MaxCount;
- var inFlight = GetInflightBackwards(id, startPosition.PreparePosition, maxCount, msg.ExcludeStart).ToArray();
- if (inFlight.Length == maxCount) {
- return inFlight;
- }
-
- int count;
- long start;
- bool excl;
- if (inFlight.Length > 0) {
- count = maxCount - inFlight.Length;
- start = inFlight[^1].LogPosition;
- excl = true;
- } else {
- count = maxCount;
- start = startPosition.PreparePosition;
- excl = msg.ExcludeStart;
- }
- var range = GetDbRecordsBackwards(GetPool(msg.Pool), id, start, count, excl);
- if (range.Count == 0) {
- return inFlight;
- }
-
- if (inFlight.Length > 0) {
- range.AddRange(inFlight);
- }
-
- return range;
- }
-
async ValueTask<(long, IReadOnlyList)> GetEventsBackwards(TFPos startPosition) {
- var indexPrepares = GetIndexRecordsBackwards(startPosition);
+ var indexPrepares = GetDbRecordsBackwards(GetPool(msg.Pool),
+ id,
+ startPosition.PreparePosition,
+ msg.MaxCount,
+ msg.ExcludeStart);
+
var events = await reader.ReadRecords(indexPrepares, false, token);
return (indexPrepares.Count, events);
}
}
private DuckDBConnectionPool GetPool(DuckDBConnectionPool? pool) {
- return pool is not null ? pool : sharedPool;
+ return pool ?? sharedPool;
}
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs
new file mode 100644
index 00000000000..e4087326600
--- /dev/null
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs
@@ -0,0 +1,77 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System.Security.Claims;
+using DuckDB.NET.Native;
+using Kurrent.Quack;
+using Kurrent.Quack.Functions;
+using KurrentDB.Core.Data;
+using KurrentDB.Core.Services.Transport.Enumerators;
+
+namespace KurrentDB.SecondaryIndexing.Indexes.User;
+
+internal sealed class ExpandRecordFunction(Func> eventsProvider)
+ : GetDatabaseEventsFunction(Name, eventsProvider) {
+
+ private new const string Name = "get_kdb_usr";
+ public static ReadOnlySpan UnnestExpression => "unnest(get_kdb_usr(log_position))"u8;
+
+ protected override void Bind(BindingContext context) {
+ // nothing to initialize here
+ }
+
+ protected override void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex) {
+ // Data column
+ var column = builder[0];
+ if (ev.IsJson) {
+ // JSON can be copied to DuckDB directly because it's encoded as UTF-8
+ column.SetValue(rowIndex, ev.Data.Span);
+ } else {
+ WriteBase64(column, rowIndex, ev.Data.Span);
+ }
+
+ // Metadata column
+ column = builder[1];
+ column.SetValue(rowIndex, ev.Metadata.Span is { Length: > 0 } metadata ? metadata : "{}"u8);
+
+ // StreamId column
+ column = builder[2];
+ column.SetValue(rowIndex, ev.EventStreamId);
+
+ // EventType column
+ column = builder[3];
+ column.SetValue(rowIndex, ev.EventType);
+ }
+
+ protected override void FillRowWithEmptyData(ref TBuilder builder, int rowIndex) {
+ // Data column
+ var column = builder[0];
+ column.SetValue(rowIndex, "{}"u8);
+
+ // Metadata column
+ column = builder[1];
+ column.SetValue(rowIndex, "{}"u8);
+
+ // StreamId column
+ column = builder[2];
+ column.SetValue(rowIndex, string.Empty);
+
+ // EventType column
+ column = builder[3];
+ column.SetValue(rowIndex, string.Empty);
+ }
+}
+
+internal readonly ref struct EventColumns : ICompositeReturnType {
+ private const DuckDBType Data = DuckDBType.Varchar;
+ private const DuckDBType Metadata = DuckDBType.Varchar;
+ private const DuckDBType StreamId = DuckDBType.Varchar;
+ private const DuckDBType EventType = DuckDBType.Varchar;
+
+ static IReadOnlyList> ICompositeReturnType.ReturnType => new ICompositeReturnType.Builder {
+ { Data, "data" },
+ { Metadata, "metadata" },
+ { StreamId, "stream_id" },
+ { EventType, "event_type" },
+ };
+}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs
index aad1dfa822b..03e58d68e12 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs
@@ -2,118 +2,116 @@
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
using System.Globalization;
-using DuckDB.NET.Data.DataChunk.Writer;
+using System.Numerics;
+using DotNext.Patterns;
using Jint;
using Jint.Native;
using Kurrent.Quack;
+using Kurrent.Quack.Threading;
namespace KurrentDB.SecondaryIndexing.Indexes.User;
public interface IField {
- static abstract IField ParseFrom(JsValue value);
- static abstract IField ParseFrom(string value);
static abstract string GetCreateStatement(string field);
- static abstract Type? Type { get; }
string GetQueryStatement(string field);
void BindTo(PreparedStatement statement, ref int index);
- void AppendTo(Appender.Row row);
- void WriteTo(IDuckDBDataWriter writer, ulong rowIndex);
+ void AppendTo(BufferedAppender.Row row);
}
-internal readonly record struct Int16Field(short Key) : IField {
- public static Type Type { get; } = typeof(short);
- public static IField ParseFrom(JsValue value) => new Int16Field(Convert.ToInt16(value.AsNumber()));
- public static IField ParseFrom(string value) => new Int16Field(Convert.ToInt16(value));
+public interface IField : IField
+ where TSelf : IField
+{
+ static abstract TSelf ParseFrom(JsValue value);
+ static abstract TSelf ParseFrom(string value);
+}
+
+file interface INumericField : IField
+ where TSelf : struct, INumericField
+ where TNumber : struct, INumber {
+ static abstract TSelf Create(TNumber value);
+
+ static TSelf IField.ParseFrom(JsValue value) => TSelf.Create(TNumber.CreateChecked(value.AsNumber()));
+
+ static TSelf IField.ParseFrom(string value) => TSelf.Create(TNumber.Parse(value, provider: null));
+}
+
+internal readonly record struct Int16Field(short Key) : INumericField {
+ public static Int16Field Create(short value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" SMALLINT not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString();
}
-internal readonly record struct Int32Field(int Key) : IField {
- public static Type Type { get; } = typeof(int);
- public static IField ParseFrom(JsValue value) => new Int32Field(Convert.ToInt32(value.AsNumber()));
- public static IField ParseFrom(string value) => new Int32Field(Convert.ToInt32(value));
+internal readonly record struct Int32Field(int Key) : INumericField {
+ public static Int32Field Create(int value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" INTEGER not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString();
}
-internal readonly record struct Int64Field(long Key) : IField {
- public static Type Type { get; } = typeof(long);
- public static IField ParseFrom(JsValue value) => new Int64Field(Convert.ToInt64(value.AsNumber()));
- public static IField ParseFrom(string value) => new Int64Field(Convert.ToInt64(value));
+internal readonly record struct Int64Field(long Key) : INumericField {
+ public static Int64Field Create(long value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" BIGINT not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString();
}
-internal readonly record struct UInt32Field(uint Key) : IField {
- public static Type Type { get; } = typeof(uint);
- public static IField ParseFrom(JsValue value) => new UInt32Field(Convert.ToUInt32(value.AsNumber()));
- public static IField ParseFrom(string value) => new UInt32Field(Convert.ToUInt32(value));
+internal readonly record struct UInt32Field(uint Key) : INumericField {
+ public static UInt32Field Create(uint value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" UINTEGER not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString();
}
-internal readonly record struct UInt64Field(ulong Key) : IField {
- public static Type Type { get; } = typeof(ulong);
- public static IField ParseFrom(JsValue value) => new UInt64Field(Convert.ToUInt64(value.AsNumber()));
- public static IField ParseFrom(string value) => new UInt64Field(Convert.ToUInt64(value));
+internal readonly record struct UInt64Field(ulong Key) : INumericField {
+ public static UInt64Field Create(ulong value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" UBIGINT not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString();
}
-internal readonly record struct DoubleField(double Key) : IField {
- public static Type Type { get; } = typeof(double);
- public static IField ParseFrom(JsValue value) => new DoubleField(value.AsNumber());
- public static IField ParseFrom(string value) => new DoubleField(Convert.ToDouble(value));
+internal readonly record struct DoubleField(double Key) : INumericField {
+ public static DoubleField Create(double value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" DOUBLE not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key.ToString(CultureInfo.InvariantCulture);
}
-internal readonly record struct StringField(string Key) : IField {
- public static Type Type { get; } = typeof(string);
- public static IField ParseFrom(JsValue value) => new StringField(value.AsString());
- public static IField ParseFrom(string value) => new StringField(value);
+internal readonly record struct StringField(string Key) : IField {
+ public static StringField ParseFrom(JsValue value) => ParseFrom(value.AsString());
+ public static StringField ParseFrom(string value) => new(value);
public static string GetCreateStatement(string field) => $", \"{field}\" VARCHAR not null";
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
- public void AppendTo(Appender.Row row) => row.Add(Key);
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
+ public void AppendTo(BufferedAppender.Row row) => row.Add(Key);
public override string ToString() => Key;
}
-internal readonly record struct NullField : IField {
- public static Type? Type { get => null; }
- public static IField ParseFrom(JsValue value) {
+internal sealed class NullField : IField, ISingleton {
+ public static NullField Instance { get; } = new();
+
+ private NullField() {
+ }
+
+ public static NullField ParseFrom(JsValue value) {
return !value.IsNull() ? throw new ArgumentException(null, nameof(value)) : new NullField();
}
- public static IField ParseFrom(string value) => throw new NotSupportedException();
+ public static NullField ParseFrom(string value) => throw new NotSupportedException();
public static string GetCreateStatement(string field) => string.Empty;
public string GetQueryStatement(string field) => string.Empty;
public void BindTo(PreparedStatement statement, ref int index) { }
- public void AppendTo(Appender.Row row) { }
- public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) { }
+ public void AppendTo(BufferedAppender.Row row) { }
}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs
index 34746b194a0..8957d2a9365 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs
@@ -2,7 +2,9 @@
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
using System.Diagnostics.Metrics;
+using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
using Kurrent.Surge.Schema.Serializers;
using KurrentDB.Core;
using KurrentDB.Core.Bus;
@@ -58,6 +60,12 @@ public UserIndexEngine(
subscriber.Subscribe(this);
}
+ public bool TryCaptureSnapshot(ReadOnlySpan viewNameUtf8,
+ DuckDBAdvancedConnection connection,
+ out UserIndexEngineSubscription.ReadLock readLock,
+ out BufferedView.Snapshot snapshot)
+ => _subscription.TryCaptureSnapshot(viewNameUtf8, connection, out readLock, out snapshot);
+
public void EnsureLive() {
if (!_subscription.CaughtUp) {
throw new UserIndexesNotReadyException(_subscription.Checkpoint, _writerCheckpoint.Read());
@@ -136,8 +144,8 @@ private class Filter : IEventFilter {
CancellationToken token) =>
_subscription.ReadBackwards(msg, token);
- public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string inFlightTableName, out string? fieldName) =>
- _subscription.TryGetUserIndexTableDetails(indexName, out tableName, out inFlightTableName, out fieldName);
+ public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string? fieldName) =>
+ _subscription.TryGetUserIndexTableDetails(indexName, out tableName, out fieldName);
}
static partial class UserIndexEngineLogMessages {
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs
new file mode 100644
index 00000000000..c1d1990a7a7
--- /dev/null
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs
@@ -0,0 +1,23 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System.Diagnostics.CodeAnalysis;
+using System.Text;
+
+namespace KurrentDB.SecondaryIndexing.Indexes.User;
+
+partial class UserIndexEngineSubscription {
+ private sealed class ViewNameEqualityComparer : IEqualityComparer, IAlternateEqualityComparer, ViewName> {
+ bool IEqualityComparer.Equals(ViewName x, ViewName y) => x.Equals(y);
+
+ int IEqualityComparer.GetHashCode(ViewName obj) => obj.GetHashCode();
+
+ bool IAlternateEqualityComparer, ViewName>.Equals(ReadOnlySpan alternate, ViewName other)
+ => other.Value.SequenceEqual(alternate);
+
+ int IAlternateEqualityComparer, ViewName>.GetHashCode(ReadOnlySpan alternate)
+ => ViewName.GetHashCode(alternate);
+
+ ViewName IAlternateEqualityComparer, ViewName>.Create(ReadOnlySpan alternate) => new(alternate);
+ }
+}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs
index 21a902dc37e..c851e4db14b 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs
@@ -6,6 +6,7 @@
using System.Threading.Channels;
using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
using Kurrent.Surge.Schema;
using Kurrent.Surge.Schema.Serializers;
using KurrentDB.Core;
@@ -23,7 +24,7 @@
namespace KurrentDB.SecondaryIndexing.Indexes.User;
-public class UserIndexEngineSubscription(
+public partial class UserIndexEngineSubscription(
ISystemClient client,
IPublisher publisher,
ISchemaSerializer serializer,
@@ -37,13 +38,15 @@ public class UserIndexEngineSubscription(
: ISecondaryIndexReader {
private readonly ConcurrentDictionary _userIndexes = new();
+ private readonly ConcurrentDictionary _viewNameToIndexNameMapping = new(new ViewNameEqualityComparer());
private readonly CancellationTokenSource _cts = CancellationTokenSource.CreateLinkedTokenSource(token);
private readonly ILogger _log = logFactory.CreateLogger();
record struct UserIndexData(
ReaderWriterLockSlim RWLock,
UserIndexSubscription Subscription,
- SecondaryIndexReaderBase Reader);
+ UserIndexReader Reader,
+ string ViewName);
public bool CaughtUp { get; private set; }
public long Checkpoint { get; private set; }
@@ -186,7 +189,7 @@ await client.Subscriptions.SubscribeToStream(
}
private ValueTask StartUserIndex(string indexName, IndexCreated createdEvent) {
- if (createdEvent.Fields.Count is 0)
+ if (createdEvent.Fields is [])
return StartUserIndex(indexName, createdEvent);
return createdEvent.Fields[0].Type switch {
@@ -203,11 +206,9 @@ private ValueTask StartUserIndex(string indexName, IndexCreated createdEvent) {
private async ValueTask StartUserIndex(
string indexName,
- IndexCreated createdEvent) where TField : IField {
+ IndexCreated createdEvent) where TField : IField {
_log.LogStartingUserIndex(indexName);
- var inFlightRecords = new UserIndexInFlightRecords(options);
-
var sql = new UserIndexSql(
indexName,
createdEvent.Fields.Count is 0
@@ -222,14 +223,13 @@ createdEvent.Fields.Count is 0
: createdEvent.Fields[0].Selector,
db: db,
sql: sql,
- inFlightRecords: inFlightRecords,
publisher: publisher,
meter: meter,
getLastAppendedRecord: getLastAppendedRecord,
loggerFactory: logFactory
);
- var reader = new UserIndexReader(sharedPool: db, sql, inFlightRecords, readIndex);
+ var reader = new UserIndexReader(sharedPool: db, processor, readIndex);
UserIndexSubscription subscription = new UserIndexSubscription(
publisher: publisher,
@@ -238,18 +238,22 @@ createdEvent.Fields.Count is 0
log: logFactory.CreateLogger(),
token: _cts.Token);
- _userIndexes.TryAdd(indexName, new(new(), subscription, reader));
+ _userIndexes.TryAdd(indexName, new(new(), subscription, reader, sql.ViewName));
+ var viewName = new ViewName(sql.ViewName);
+ _viewNameToIndexNameMapping.TryAdd(viewName, indexName);
await subscription.Start();
}
private async Task StopUserIndex(string indexName) {
_log.LogStoppingUserIndex(LogLevel.Debug, indexName);
- var writeLock = AcquireWriteLockForIndex(indexName, out var index);
+ var writeLock = AcquireWriteLockForIndex(indexName, out var viewName, out var index);
using (writeLock) {
// we have the write lock, there are no readers. after we release the lock the index is no longer
// in the dictionary so there can be no new readers.
_userIndexes.TryRemove(indexName, out _);
+ var name = new ViewName(viewName);
+ _viewNameToIndexNameMapping.TryRemove(name, out _);
}
// guaranteed no readers, we can stop.
@@ -322,39 +326,55 @@ public TFPos GetLastIndexedPosition(string indexStream) {
return data.Reader.ReadBackwards(msg, token);
}
- public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string inFlightTableName, out string? fieldName) {
+ public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string? fieldName) {
if (!TryAcquireReadLockForIndex(indexName, out var readLock, out var data)) {
tableName = null!;
- inFlightTableName = null!;
fieldName = null;
return false;
}
using (readLock) {
- data.Subscription.GetUserIndexTableDetails(out tableName, out inFlightTableName, out fieldName);
+ data.Subscription.GetUserIndexTableDetails(out tableName, out fieldName);
+ return true;
+ }
+ }
+
+ public bool TryCaptureSnapshot(ReadOnlySpan viewNameUtf8,
+ DuckDBAdvancedConnection connection,
+ out ReadLock readLock,
+ out BufferedView.Snapshot snapshot) {
+ var view = _viewNameToIndexNameMapping.GetAlternateLookup>();
+ if (view.TryGetValue(viewNameUtf8, out var indexName)
+ && TryAcquireReadLockForIndex(indexName, out readLock, out var data)) {
+ snapshot = data.Reader.CaptureSnapshot(connection);
return true;
}
+
+ snapshot = default;
+ readLock = default;
+ return false;
}
- private bool TryAcquireReadLockForIndex(string index, out ReadLock? readLock, out UserIndexData data) {
+ private bool TryAcquireReadLockForIndex(ReadOnlySpan index, out ReadLock readLock, out UserIndexData data) {
// note: a write lock is acquired only when deleting the index. so, if we cannot acquire a read lock,
// it means that the user index is being/has been deleted.
- readLock = null;
+ readLock = default;
data = default;
- if (!_userIndexes.TryGetValue(index, out data)) {
+ var view = _userIndexes.GetAlternateLookup>();
+ if (!view.TryGetValue(index, out data)) {
return false;
}
if (!data.RWLock.TryEnterReadLock(TimeSpan.Zero))
return false;
- readLock = new ReadLock(data.RWLock);
+ readLock = new(data.RWLock);
return true;
}
- private WriteLock AcquireWriteLockForIndex(string index, out UserIndexSubscription subscription) {
+ private WriteLock AcquireWriteLockForIndex(string index, out string viewName, out UserIndexSubscription subscription) {
if (!_userIndexes.TryGetValue(index, out var data))
throw new Exception($"Failed to acquire write lock for index: {index}");
@@ -362,11 +382,12 @@ private WriteLock AcquireWriteLockForIndex(string index, out UserIndexSubscripti
throw new Exception($"Timed out when acquiring write lock for index: {index}");
subscription = data.Subscription;
+ viewName = data.ViewName;
return new(data.RWLock);
}
- private readonly record struct ReadLock(ReaderWriterLockSlim Lock) : IDisposable {
- public void Dispose() => Lock.ExitReadLock();
+ public readonly record struct ReadLock(ReaderWriterLockSlim Lock) : IDisposable {
+ public void Dispose() => Lock?.ExitReadLock();
}
private readonly record struct WriteLock(ReaderWriterLockSlim Lock) : IDisposable {
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs
deleted file mode 100644
index 06eceaf6b06..00000000000
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
-// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
-
-using KurrentDB.SecondaryIndexing.Storage;
-
-namespace KurrentDB.SecondaryIndexing.Indexes.User;
-
-internal record struct UserIndexInFlightRecord(
- long LogPosition,
- long CommitPosition,
- long EventNumber,
- long Created,
- TField? Field
-);
-
-internal class UserIndexInFlightRecords(SecondaryIndexingPluginOptions options) {
- private readonly UserIndexInFlightRecord[] _records = new UserIndexInFlightRecord[options.CommitBatchSize];
-
- private uint _version; // used for optimistic lock
- private int _count;
-
- public int Count => _count;
-
- public void Append(long logPosition, long commitPosition, long eventNumber, TField? field, long created) {
- var count = _count;
- _records[count] = new() {
- LogPosition = logPosition,
- CommitPosition = commitPosition,
- EventNumber = eventNumber,
- Field = field,
- Created = created
- };
-
- // Fence: make sure that the array modification cannot be done after the increment
- Volatile.Write(ref _count, count + 1);
- }
-
- public void Clear() {
- Interlocked.Increment(ref _version); // full fence
-
- // Fence: make sure that the count is modified after the version
- _count = 0;
- }
-
- // read is protected by optimistic lock
- private bool TryRead(uint currentVer, int index, out UserIndexInFlightRecord record) {
- record = _records[index];
-
- // ensure that the record is copied before the comparison
- Interlocked.MemoryBarrier();
- return currentVer == _version;
- }
-
- public (List, bool) GetInFlightRecordsForwards(
- long startPosition,
- int maxCount,
- bool excludeFirst,
- Func, bool>? query = null) {
- query ??= True; // to avoid branching in the loop
-
- var isComplete = false;
- bool first = true;
-
- var currentVer = _version;
- var result = new List();
- for (int i = 0, count = Volatile.Read(in _count), remaining = maxCount;
- i < count && remaining > 0 && TryRead(currentVer, i, out var current);
- i++) {
- if (current.LogPosition >= startPosition) {
- if (i == 0 && current.LogPosition == startPosition) {
- isComplete = true;
- }
- if (query(current)) {
- if (first && excludeFirst && current.LogPosition == startPosition) {
- first = false;
- continue;
- }
-
- remaining--;
- result.Add(new(current.LogPosition, current.CommitPosition, current.EventNumber));
- }
- } else {
- if (i == 0) {
- isComplete = true;
- }
- }
- }
-
- return (result, isComplete);
- }
-
- public IEnumerable GetInFlightRecordsBackwards(
- long startPosition,
- int maxCount,
- bool excludeFirst,
- Func, bool>? query = null) {
- query ??= True; // to avoid branching in the loop
-
- var count = Volatile.Read(in _count);
- var currentVer = _version;
- bool first = true;
-
- if (count > 0
- && TryRead(currentVer, 0, out var current)
- && current.LogPosition <= startPosition) {
- for (int i = count - 1, remaining = maxCount;
- i >= 0 && remaining > 0 && TryRead(currentVer, i, out current);
- i--) {
- if (current.LogPosition <= startPosition && query(current)) {
- if (first && excludeFirst && current.LogPosition == startPosition) {
- first = false;
- continue;
- }
-
- remaining--;
- yield return new(current.LogPosition, current.CommitPosition, current.EventNumber);
- }
- }
- }
- }
-
- private static bool True(UserIndexInFlightRecord record) => true;
-
- public IEnumerable> GetInFlightRecords() {
- var currentVer = _version;
- for (int i = 0, count = Volatile.Read(in _count);
- i < count && TryRead(currentVer, i, out var current);
- i++) {
- yield return current;
- }
- }
-}
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs
index cb3b5c90969..2e200df047a 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs
@@ -4,12 +4,11 @@
using System.Diagnostics.Metrics;
using DotNext;
using DotNext.Threading;
-using DuckDB.NET.Data;
-using DuckDB.NET.Data.DataChunk.Writer;
using Jint;
using Jint.Native.Function;
using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
using Kurrent.Surge.Schema.Serializers.Json;
using KurrentDB.Common.Configuration;
using KurrentDB.Core.Bus;
@@ -28,15 +27,16 @@ internal abstract class UserIndexProcessor : Disposable, ISecondaryIndexProcesso
public abstract bool TryIndex(ResolvedEvent evt);
public abstract TFPos GetLastPosition();
public abstract SecondaryIndexProgressTracker Tracker { get; }
+ public abstract UserIndexSql Sql { get; }
+ public abstract BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection);
}
-internal class UserIndexProcessor : UserIndexProcessor where TField : IField {
+internal class UserIndexProcessor : UserIndexProcessor
+ where TField : IField {
private readonly Engine _engine = JintEngineFactory.CreateEngine(executionTimeout: TimeSpan.FromSeconds(30));
private readonly JsRecordEvaluator _evaluator;
private readonly Function? _filter;
private readonly Function? _fieldSelector;
- private readonly UserIndexInFlightRecords _inFlightRecords;
- private readonly string _inFlightTableName;
private readonly string _queryStreamName;
private readonly IPublisher _publisher;
private readonly DuckDBAdvancedConnection _connection;
@@ -45,13 +45,13 @@ internal class UserIndexProcessor : UserIndexProcessor where TField : IF
private readonly ILogger _log;
private ulong _sequenceId;
- private TFPos _lastPosition;
- private Appender _appender;
+ private Atomic _lastPosition;
+ private readonly BufferedView _appender;
private Atomic.Boolean _committing;
public string IndexName { get; }
- public override TFPos GetLastPosition() => _lastPosition;
+ public override TFPos GetLastPosition() => _lastPosition.Value;
public override SecondaryIndexProgressTracker Tracker { get; }
public UserIndexProcessor(
@@ -60,7 +60,6 @@ public UserIndexProcessor(
string jsFieldSelector,
DuckDBConnectionPool db,
UserIndexSql sql,
- UserIndexInFlightRecords inFlightRecords,
IPublisher publisher,
[FromKeyedServices(SecondaryIndexingConstants.InjectionKey)]
Meter meter,
@@ -72,8 +71,6 @@ public UserIndexProcessor(
_sql = sql;
_log = loggerFactory.CreateLogger();
- _inFlightRecords = inFlightRecords;
- _inFlightTableName = UserIndexSql.GenerateInFlightTableNameFor(IndexName);
_queryStreamName = UserIndexHelpers.GetQueryStreamName(IndexName);
_publisher = publisher;
@@ -90,29 +87,34 @@ public UserIndexProcessor(
_connection = db.Open();
_sql.CreateUserIndex(_connection);
- RegisterTableFunction();
-
- _appender = new(_connection, _sql.TableNameUtf8.Span);
+ _appender = new(_connection, _sql.TableName, "log_position", _sql.ViewName);
var serviceName = metricsConfiguration?.ServiceName ?? "kurrentdb";
var tracker = new SecondaryIndexProgressTracker(indexName, serviceName, meter, clock ?? TimeProvider.System,
loggerFactory.CreateLogger(), getLastAppendedRecord);
- (_lastPosition, var lastTimestamp) = GetLastKnownRecord();
- _log.LogUserIndexLoadedLastKnownLogPosition(IndexName, _lastPosition, lastTimestamp);
- tracker.InitLastIndexed(_lastPosition.CommitPosition, lastTimestamp);
+ var (lastPosition, lastTimestamp) = GetLastKnownRecord();
+ _log.LogUserIndexLoadedLastKnownLogPosition(IndexName, lastPosition, lastTimestamp);
+ tracker.InitLastIndexed(lastPosition.CommitPosition, lastTimestamp);
+ _lastPosition.Write(in lastPosition);
Tracker = tracker;
}
+ public override UserIndexSql Sql => _sql;
+
+ public override BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection)
+ => _appender.TakeSnapshot(connection, ExpandRecordFunction.UnnestExpression);
+
public override bool TryIndex(ResolvedEvent resolvedEvent) {
if (IsDisposingOrDisposed)
return false;
var canHandle = CanHandleEvent(resolvedEvent, out var field);
- _lastPosition = resolvedEvent.OriginalPosition!.Value;
+ var eventPosition = resolvedEvent.OriginalPosition!.Value;
if (!canHandle) {
+ _lastPosition.Write(in eventPosition);
Tracker.RecordIndexed(resolvedEvent);
return false;
}
@@ -139,7 +141,7 @@ public override bool TryIndex(ResolvedEvent resolvedEvent) {
field?.AppendTo(row);
}
- _inFlightRecords.Append(preparePosition, commitPosition ?? preparePosition, eventNumber, field, created);
+ _lastPosition.Write(in eventPosition);
_publisher.Publish(new StorageMessage.SecondaryIndexCommitted(_queryStreamName, resolvedEvent));
if (field is not null)
@@ -167,7 +169,7 @@ bool CanHandleEvent(ResolvedEvent resolvedEvent, out TField? field) {
if (_skip.Equals(fieldValue))
return false;
- field = (TField)TField.ParseFrom(fieldValue);
+ field = TField.ParseFrom(fieldValue);
return true;
} catch (Exception ex) {
@@ -213,67 +215,30 @@ public void Checkpoint(TFPos position, DateTime timestamp) {
Created = new DateTimeOffset(timestamp).ToUnixTimeMilliseconds()
};
- UserIndexSql.SetCheckpoint(_connection, checkpointArgs);
+ UserIndexSql.SetCheckpoint(_connection, checkpointArgs);
}
- public override void Commit() => Commit(true);
-
///
/// Commits all in-flight records to the index.
///
- /// Tells you whether to clear the in-flight records after committing. It must be true and only set to false in tests.
- private void Commit(bool clearInflight) {
+ public override void Commit() {
if (IsDisposed || !_committing.FalseToTrue())
return;
try {
using var duration = Tracker.StartCommitDuration();
_appender.Flush();
- _log.LogUserIndexCommitted(IndexName, _inFlightRecords.Count);
+ _log.LogUserIndexCommitted(IndexName);
} catch (Exception ex) {
- _log.LogUserIndexFailedToCommit(ex, IndexName, _inFlightRecords.Count);
+ _log.LogUserIndexFailedToCommit(ex, IndexName);
throw;
} finally {
_committing.TrueToFalse();
}
-
- if (clearInflight) {
- _inFlightRecords.Clear();
- }
- }
-
- private void RegisterTableFunction() {
- _connection.RegisterTableFunction(_inFlightTableName, ResultCallback, MapperCallback);
-
- TableFunction ResultCallback() {
- var records = _inFlightRecords.GetInFlightRecords();
- List columnInfos = [
- new("log_position", typeof(long)),
- new("event_number", typeof(long)),
- new("created", typeof(long)),
- ];
-
- if (TField.Type is { } type)
- columnInfos.Add(new(_sql.FieldColumnName, type));
-
- return new(columnInfos, records);
- }
-
- void MapperCallback(object? item, IDuckDBDataWriter[] writers, ulong rowIndex) {
- var record = (UserIndexInFlightRecord)item!;
- writers[0].WriteValue(record.LogPosition, rowIndex);
- writers[1].WriteValue(record.EventNumber, rowIndex);
- writers[2].WriteValue(record.Created, rowIndex);
-
- if (TField.Type is not null) {
- record.Field!.WriteTo(writers[3], rowIndex);
- }
- }
}
- public void GetUserIndexTableDetails(out string tableName, out string inFlightTableName, out string? fieldName) {
+ public void GetUserIndexTableDetails(out string tableName, out string? fieldName) {
tableName = _sql.TableName;
- inFlightTableName = _inFlightTableName;
fieldName = _sql.FieldColumnName;
}
@@ -281,6 +246,7 @@ protected override void Dispose(bool disposing) {
_log.LogStoppingUserIndexProcessor(IndexName);
if (disposing) {
Commit();
+ _appender.Unregister(_connection);
_appender.Dispose();
_connection.Dispose();
_engine.Dispose();
@@ -309,11 +275,11 @@ static partial class UserIndexProcessorLogMessages {
[LoggerMessage(LogLevel.Trace, "User index: {index} is checkpointing at: {position} ({timestamp})")]
internal static partial void LogUserIndexIsCheckpointing(this ILogger logger, string index, TFPos position, DateTime timestamp);
- [LoggerMessage(LogLevel.Trace, "User index: {index} committed {count} records")]
- internal static partial void LogUserIndexCommitted(this ILogger logger, string index, int count);
+ [LoggerMessage(LogLevel.Trace, "User index: {index} committed")]
+ internal static partial void LogUserIndexCommitted(this ILogger logger, string index);
- [LoggerMessage(LogLevel.Error, "User index: {index} failed to commit {count} records")]
- internal static partial void LogUserIndexFailedToCommit(this ILogger logger, Exception exception, string index, int count);
+ [LoggerMessage(LogLevel.Error, "User index: {index} failed to commit")]
+ internal static partial void LogUserIndexFailedToCommit(this ILogger logger, Exception exception, string index);
[LoggerMessage(LogLevel.Trace, "Stopping user index processor for: {index}")]
internal static partial void LogStoppingUserIndexProcessor(this ILogger logger, string index);
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs
index 3d840eef331..3ec3d9c1e61 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs
@@ -1,19 +1,25 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
+using Kurrent.Quack.Threading;
using KurrentDB.Core.Data;
using KurrentDB.Core.Services.Storage.ReaderIndex;
using KurrentDB.SecondaryIndexing.Storage;
namespace KurrentDB.SecondaryIndexing.Indexes.User;
+internal abstract class UserIndexReader(DuckDBConnectionPool sharedPool, IReadIndex index)
+ : SecondaryIndexReaderBase(sharedPool, index) {
+ internal abstract BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection);
+}
+
internal class UserIndexReader(
DuckDBConnectionPool sharedPool,
- UserIndexSql sql,
- UserIndexInFlightRecords inFlightRecords,
+ UserIndexProcessor processor,
IReadIndex index
-) : SecondaryIndexReaderBase(sharedPool, index) where TField : IField {
+) : UserIndexReader(sharedPool, index) where TField : IField {
protected override string? GetId(string indexStream) {
// the field is used as the ID. null when there is no field
@@ -22,38 +28,32 @@ IReadIndex index
return field;
}
- protected override (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst) {
- return TryGetField(id, out var field)
- ? inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, Filter)
- : ([], true);
-
- bool Filter(UserIndexInFlightRecord r) => id is null || EqualityComparer.Default.Equals(r.Field, field!);
- }
-
- protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst) {
+ protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) {
if (!TryGetField(id, out var field))
return [];
var args = new ReadUserIndexQueryArgs {
StartPosition = startPosition,
- EndPosition = endPosition,
ExcludeFirst = excludeFirst,
Count = maxCount,
- Field = id is null ? new NullField() : field!
+ Field = id is null ? NullField.Instance : field!
};
- return sql.ReadUserIndexForwardsQuery(db, args);
- }
-
- protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst) {
- return TryGetField(id, out var field)
- ? inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, Filter)
- : [];
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ processor.Sql.ReadUserIndexForwardsQuery(connection, args, records);
+ }
+ }
- bool Filter(UserIndexInFlightRecord r) => id is null || EqualityComparer.Default.Equals(r.Field, field!);
+ return records;
}
- protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) {
+ protected override List GetDbRecordsBackwards(DuckDBConnectionPool db,
+ string? id,
+ long startPosition,
+ int maxCount,
+ bool excludeFirst) {
if (!TryGetField(id, out var field))
return [];
@@ -61,15 +61,25 @@ protected override List GetDbRecordsBackwards(DuckDBConnection
StartPosition = startPosition,
Count = maxCount,
ExcludeFirst = excludeFirst,
- Field = id is null ? new NullField() : field!
+ Field = id is null ? NullField.Instance : field!
};
- return sql.ReadUserIndexBackwardsQuery(db, args);
+ var records = new List(maxCount);
+ using (db.Rent(out var connection)) {
+ using (processor.CaptureSnapshot(connection)) {
+ processor.Sql.ReadUserIndexBackwardsQuery(connection, args, records);
+ }
+ }
+
+ return records;
}
public override TFPos GetLastIndexedPosition(string _) => throw new InvalidOperationException(); // never called
public override bool CanReadIndex(string _) => throw new InvalidOperationException(); // never called
+ internal override BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection)
+ => processor.CaptureSnapshot(connection);
+
private static bool TryGetField(string? id, out TField? field) {
field = default;
@@ -77,7 +87,7 @@ private static bool TryGetField(string? id, out TField? field) {
return true;
try {
- field = (TField) TField.ParseFrom(id);
+ field = TField.ParseFrom(id);
return true;
} catch {
// invalid field
diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs
index 2a112f5eaac..96e95e4791d 100644
--- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs
+++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs
@@ -2,18 +2,52 @@
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
using System.Text;
+using System.Text.Json;
using System.Text.RegularExpressions;
+using DotNext.Buffers;
using Kurrent.Quack;
-using Kurrent.Quack.ConnectionPool;
using KurrentDB.SecondaryIndexing.Storage;
namespace KurrentDB.SecondaryIndexing.Indexes.User;
-internal static partial class UserIndexSql {
+internal abstract partial class UserIndexSql(string indexName, string fieldName) {
// we validate the table/column names for safety reasons, although DuckDB allows a large set of characters when using quoted identifiers
private static readonly Regex IdentifierRegex = ValidationRegex();
- public static string GetTableNameFor(string indexName) {
+ public string TableName { get; } = GetTableNameFor(indexName);
+ public string FieldColumnName { get; } = GetColumnNameFor(fieldName);
+ public string ViewName { get; } = GetViewNameFor(indexName);
+
+ public GetCheckpointResult? GetCheckpoint(DuckDBAdvancedConnection connection, in GetCheckpointQueryArgs args)
+ => connection.QueryFirstOrDefault(in args).ValueOrDefault;
+
+ public static void SetCheckpoint(DuckDBAdvancedConnection connection, in SetCheckpointQueryArgs args)
+ => connection.ExecuteNonQuery(in args);
+
+ public GetLastIndexedRecordResult? GetLastIndexedRecord(DuckDBAdvancedConnection connection) {
+ var query = new GetLastIndexedRecordQuery(TableName);
+ return connection.ExecuteQuery(ref query).FirstOrDefault().ValueOrDefault;
+ }
+
+ public void ReadUserIndexForwardsQuery(DuckDBAdvancedConnection connection,
+ in ReadUserIndexQueryArgs args,
+ ICollection records) {
+ var query = new ReadUserIndexForwardsQuery(ViewName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName));
+ connection
+ .ExecuteQuery(ref query, in args)
+ .CopyTo(records);
+ }
+
+ public void ReadUserIndexBackwardsQuery(DuckDBAdvancedConnection connection,
+ in ReadUserIndexQueryArgs args,
+ ICollection records) {
+ var query = new ReadUserIndexBackwardsQuery(ViewName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName));
+ connection
+ .ExecuteQuery