diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 4558d757f70..d2e3f4d23e8 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -58,7 +58,7 @@ - + diff --git a/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs b/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs index 65a766560ed..3c4f7981a01 100644 --- a/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs +++ b/src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Threading; @@ -49,9 +48,10 @@ public DuckDBConnectionPoolLifetime( _repeated = repeated; Shared = CreatePool(isReadOnly: false, log: true); - using var connection = Shared.Open(); - foreach (var s in once) - s.Execute(connection); + using (Shared.Rent(out var connection)) { + foreach (var s in once) + s.Execute(connection); + } return; @@ -105,6 +105,7 @@ public Task StopAsync(CancellationToken cancellationToken) { protected override void Dispose(bool disposing) { if (disposing) { + Shared.Dispose(); if (_tempPath != null) { try { File.Delete(_tempPath); @@ -118,7 +119,6 @@ protected override void Dispose(bool disposing) { } private class ConnectionPoolWithFunctions(string connectionString, IReadOnlyList setup) : DuckDBConnectionPool(connectionString) { - [Experimental("DuckDBNET001")] protected override void Initialize(DuckDBAdvancedConnection connection) { base.Initialize(connection); for (var i = 0; i < setup.Count; i++) { diff --git a/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs b/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs index 7c2ad41b00d..0cb03f04c14 100644 --- a/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs +++ b/src/KurrentDB.Core/DuckDB/InjectionExtensions.cs @@ -17,7 +17,6 @@ public static class InjectionExtensions { public static IServiceCollection AddDuckDb(this IServiceCollection services) { services.AddSingleton(); services.AddHostedService(sp => sp.GetRequiredService()); - services.AddDuckDBSetup(); services.AddSingleton(sp => sp.GetRequiredService().Shared); services.AddSingleton(); return services; diff --git a/src/KurrentDB.Core/DuckDB/InlineFunctions.cs b/src/KurrentDB.Core/DuckDB/InlineFunctions.cs deleted file mode 100644 index 09250f2cac7..00000000000 --- a/src/KurrentDB.Core/DuckDB/InlineFunctions.cs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. -// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). - -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Linq; -using DuckDB.NET.Data; -using DuckDB.NET.Data.DataChunk.Reader; -using DuckDB.NET.Data.DataChunk.Writer; -using KurrentDB.Common.Utils; -using KurrentDB.Core.Bus; -using KurrentDB.Core.Services.Transport.Enumerators; -using KurrentDB.Core.Services.UserManagement; -using KurrentDB.DuckDB; -using ResolvedEvent = KurrentDB.Core.Data.ResolvedEvent; - -namespace KurrentDB.Core.DuckDB; - -public class KdbGetEventSetup(IPublisher publisher) : IDuckDBSetup { - [Experimental("DuckDBNET001")] - public void Execute(DuckDBConnection connection) { - connection.RegisterScalarFunction("kdb_get", GetEvent); - } - - public bool OneTimeOnly => false; - - [Experimental("DuckDBNET001")] - private void GetEvent(IReadOnlyList readers, IDuckDBDataWriter writer, ulong rowCount) { - var positions = Enumerable.Range(0, (int)rowCount).Select(x => (long)readers[0].GetValue((ulong)x)).ToArray(); - var result = publisher.ReadEvents(positions).ToArray(); - - for (ulong i = 0; i < (ulong)result.Length; i++) { - var asString = AsDuckEvent(result[i]); - writer.WriteValue(asString, i); - } - } - - private static string AsDuckEvent(string stream, - string eventType, - DateTime created, - ReadOnlyMemory data, - ReadOnlyMemory meta) { - var dataString = data.IsValidUtf8Json() ? Helper.UTF8NoBom.GetString(data.Span) : "\"\""; - var metaString = meta.Length == 0 ? "{}" : Helper.UTF8NoBom.GetString(meta.Span); - return - $"{{ \"data\": {dataString}, \"metadata\": {metaString}, \"stream_id\": \"{stream}\", \"created\": \"{created:u}\", \"event_type\": \"{eventType}\" }}"; - } - - private static string AsDuckEvent(ResolvedEvent evt) - => AsDuckEvent(evt.Event.EventStreamId, evt.Event.EventType, evt.Event.TimeStamp, evt.Event.Data, evt.Event.Metadata); -} - -file static class ReadEventsExtensions { - public static IEnumerable ReadEvents(this IPublisher publisher, long[] logPositions) { - using var enumerator = GetEnumerator(); - - while (enumerator.MoveNext()) { - if (enumerator.Current is ReadResponse.EventReceived eventReceived) { - yield return eventReceived.Event; - } - } - - yield break; - - IEnumerator GetEnumerator() { - return new Enumerator.ReadLogEventsSync( - bus: publisher, - logPositions: logPositions, - user: SystemAccounts.System); - } - } -} diff --git a/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs b/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs index e58a1cec854..b56efd92c35 100644 --- a/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs +++ b/src/KurrentDB.Core/Services/RequestManager/RequestManagementService.cs @@ -170,8 +170,11 @@ public void Handle(ClientMessage.TransactionCommit message) { public void Handle(SystemMessage.StateChangeMessage message) { - + // this is now a compile error. probably change 'or' to 'and not', but needs to be thought through. + // TODO: think it through (ticket DB-1957) +#pragma warning disable CS9336 // The pattern is redundant. if (_nodeState == VNodeState.Leader && message.State is not VNodeState.Leader or VNodeState.ResigningLeader) { +#pragma warning restore CS9336 // The pattern is redundant. var keys = _currentRequests.Keys; foreach (var key in keys) { if (_currentRequests.Remove(key, out var manager)) { diff --git a/src/KurrentDB.DuckDB/DuckDBSetup.cs b/src/KurrentDB.DuckDB/DuckDBSetup.cs index d9abedf17b8..0afa2c0ba41 100644 --- a/src/KurrentDB.DuckDB/DuckDBSetup.cs +++ b/src/KurrentDB.DuckDB/DuckDBSetup.cs @@ -3,18 +3,19 @@ using DotNext.Threading; using DuckDB.NET.Data; +using Kurrent.Quack; namespace KurrentDB.DuckDB; public interface IDuckDBSetup { - void Execute(DuckDBConnection connection); + void Execute(DuckDBAdvancedConnection connection); bool OneTimeOnly { get; } } public abstract class DuckDBOneTimeSetup : IDuckDBSetup { private Atomic.Boolean _created; - public void Execute(DuckDBConnection connection) { + public void Execute(DuckDBAdvancedConnection connection) { if (!_created.FalseToTrue()) { return; } @@ -23,5 +24,5 @@ public void Execute(DuckDBConnection connection) { public bool OneTimeOnly => true; - protected abstract void ExecuteCore(DuckDBConnection connection); + protected abstract void ExecuteCore(DuckDBAdvancedConnection connection); } diff --git a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs index b3fa422f3bf..4560451a807 100644 --- a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs +++ b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs @@ -3,6 +3,7 @@ using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using KurrentDB.Core.Services.Transport.Enumerators; using KurrentDB.SecondaryIndexing.LoadTesting.Appenders; using KurrentDB.SecondaryIndexing.Storage; using KurrentDB.SecondaryIndexing.Tests.Generators; @@ -24,8 +25,11 @@ public class RawQuackMessageBatchAppender : IMessageBatchAppender { public RawQuackMessageBatchAppender(DuckDBConnectionPool db, DuckDbTestEnvironmentOptions options) { _commitSize = options.CommitSize; - var schema = new IndexingDbSchema(); - schema.CreateSchema(db); + var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty().GetEnumerator()); + + using (db.Rent(out var rentedConn)) { + schema.Execute(rentedConn); + } using var connection = db.Open(); _defaultIndexAppender = new(connection, "idx_all"u8); diff --git a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs index 8c78a2daf6a..cadfe50c76c 100644 --- a/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs +++ b/src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/Indexes/IndexMessageBatchAppender.cs @@ -3,8 +3,8 @@ using Kurrent.Quack.ConnectionPool; using KurrentDB.Core.Index.Hashes; +using KurrentDB.Core.Services.Transport.Enumerators; using KurrentDB.Core.Tests.Fakes; -using KurrentDB.SecondaryIndexing.Indexes; using KurrentDB.SecondaryIndexing.Indexes.Default; using KurrentDB.SecondaryIndexing.LoadTesting.Appenders; using KurrentDB.SecondaryIndexing.Storage; @@ -23,13 +23,14 @@ public IndexMessageBatchAppender(DuckDBConnectionPool db, int commitSize) { _commitSize = commitSize; ReadIndexStub.Build(); var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe()); - var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitSize }); var publisher = new FakePublisher(); - var schema = new IndexingDbSchema(); - schema.CreateSchema(db); + var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty().GetEnumerator()); + using (db.Rent(out var connection)) { + schema.Execute(connection); + } - _processor = new(db, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance); + _processor = new(db, publisher, hasher, new("test"), NullLoggerFactory.Instance); } public ValueTask Append(TestMessageBatch batch) { diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs b/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs index aaa796cdf29..47ed2da0957 100644 --- a/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs +++ b/src/KurrentDB.SecondaryIndexing.Tests/Fakes/TestResolvedEventFactory.cs @@ -28,6 +28,6 @@ byte[] data eventType ); - return ResolvedEvent.ForUnresolvedEvent(record, 0); + return ResolvedEvent.ForUnresolvedEvent(record, 0L); } } diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs b/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs index 653844d5428..7a1e2c333ab 100644 --- a/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs +++ b/src/KurrentDB.SecondaryIndexing.Tests/Fixtures/DuckDbIntegrationTest.cs @@ -1,7 +1,11 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). +using System.Security.Claims; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; +using KurrentDB.Core.DuckDB; +using KurrentDB.Core.Services.Transport.Enumerators; using KurrentDB.Core.XUnit.Tests; using KurrentDB.SecondaryIndexing.Storage; @@ -14,8 +18,19 @@ protected DuckDbIntegrationTest() { var dbPath = Fixture.GetFilePathFor($"{GetType().Name}.db"); DuckDb = new($"Data Source={dbPath};"); - var schema = new IndexingDbSchema(); - schema.CreateSchema(DuckDb); + var schema = new IndexingDbSchema(GetEvents); + using (DuckDb.Rent(out var connection)) { + schema.Execute(connection); + } + } + + private static IEnumerator GetEvents(long[] logPositions, ClaimsPrincipal user) { + // This is stub method for tests + for (var i = 0; i < logPositions.Length; i++) { + // See GetDatabaseEventsFunction implementation, + // for any unexpected response the function generates a row with empty values + yield return new ReadResponse.StreamNotFound(string.Empty); + } } public override async ValueTask DisposeAsync() { diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs index 192ab468ba8..913afd347cb 100644 --- a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs +++ b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs @@ -1,9 +1,8 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). -using Dapper; using DotNext; -using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack; using KurrentDB.Core.Data; using KurrentDB.Core.Index.Hashes; using KurrentDB.Core.Tests.Fakes; @@ -117,44 +116,88 @@ public void CommittedMultipleEventsToAStringWithoutCategory_AreIndexed() { } private void AssertDefaultIndexQueryReturns(List expected) { - var records = - DuckDb.QueryToList(new(-1, long.MaxValue, - int.MaxValue)); + List records; + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + records = connection + .ExecuteQuery(new(-1, int.MaxValue)) + .ToList(); + } + } Assert.Equal(expected, records.Select(x => x.LogPosition)); } private void AssertLastLogPositionQueryReturns(long? expectedLogPosition) { - var actual = DuckDb.QueryFirstOrDefault().OrNull(); + LastPositionResult? actual; + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + actual = connection.QueryFirstOrDefault().OrNull(); + } + } Assert.Equal(expectedLogPosition, actual?.PreparePosition); } private void AssertGetCategoriesQueryReturns(string[] expected) { - using var connection = DuckDb.Open(); - var records = connection.Query("select distinct category from idx_all order by log_position"); + var records = new List(); + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + using var result = connection.ExecuteAdHocQuery("select distinct category from idx_all_snapshot order by log_position"u8); + while (result.TryFetch(out var chunk)) { + using (chunk) { + while (chunk.TryRead(out var row)) { + records.Add(row.ReadString()); + } + } + } + } + } Assert.Equal(expected, records); } private void AssertCategoryIndexQueryReturns(string category, List expected) { - var records = - DuckDb.QueryToList(new(category, 0, long.MaxValue, 32)); + List records; + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + records = connection + .ExecuteQuery(new(category, 0, 32)) + .ToList(); + } + } Assert.Equal(expected, records.Select(x => x.LogPosition)); } private void AssertGetAllEventTypesQueryReturns(string[] expected) { - using var connection = DuckDb.Open(); - var records = connection.Query("select distinct event_type from idx_all order by log_position"); + var records = new List(); + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + using var result = connection.ExecuteAdHocQuery("select distinct event_type from idx_all order by log_position"u8); + while (result.TryFetch(out var chunk)) { + using (chunk) { + while (chunk.TryRead(out var row)) { + records.Add(row.ReadString()); + } + } + } + } + } Assert.Equal(expected, records); } private void AssertReadEventTypeIndexQueryReturns(string eventType, List expected) { - var records = DuckDb.QueryToList( - new(eventType, 0, long.MaxValue, 32) - ); + List records; + using (DuckDb.Rent(out var connection)) { + using (_processor.CaptureSnapshot(connection)) { + records = connection.ExecuteQuery( + new(eventType, 0, 32) + ) + .ToList(); + } + } Assert.Equal(expected, records.Select(x => x.LogPosition)); } @@ -164,13 +207,11 @@ private void AssertReadEventTypeIndexQueryReturns(string eventType, List e public DefaultIndexProcessorTests() { ReadIndexStub.Build(); - const int commitBatchSize = 9; var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe()); - var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitBatchSize }); var publisher = new FakePublisher(); - _processor = new(DuckDb, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance); + _processor = new(DuckDb, publisher, hasher, new("test"), NullLoggerFactory.Instance); } public override ValueTask DisposeAsync() { diff --git a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs index 494337ca64a..52737ff2b5e 100644 --- a/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs +++ b/src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexReaderTests/IndexTestBase.cs @@ -19,14 +19,12 @@ public abstract class IndexTestBase : DuckDbIntegrationTest { private readonly ReadIndexStub _readIndexStub = new(); protected IndexTestBase() { - const int commitBatchSize = 9; var hasher = new CompositeHasher(new XXHashUnsafe(), new Murmur3AUnsafe()); - var inFlightRecords = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitBatchSize }); var publisher = new FakePublisher(); - _processor = new(DuckDb, inFlightRecords, publisher, hasher, new("test"), NullLoggerFactory.Instance); + _processor = new(DuckDb, publisher, hasher, new("test"), NullLoggerFactory.Instance); - Sut = new(DuckDb, _processor, inFlightRecords, _readIndexStub.ReadIndex); + Sut = new(DuckDb, _processor, _readIndexStub.ReadIndex); } protected void IndexEvents(ResolvedEvent[] events, bool shouldCommit) { diff --git a/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs b/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs index dc82b5ab67b..025c0fe32b6 100644 --- a/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs +++ b/src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/ReadTests.cs @@ -10,6 +10,7 @@ using KurrentDB.SecondaryIndexing.Indexes.Category; using KurrentDB.SecondaryIndexing.Indexes.Default; using KurrentDB.SecondaryIndexing.Indexes.EventType; +using KurrentDB.SecondaryIndexing.Query; using KurrentDB.SecondaryIndexing.Tests.Generators; using KurrentDB.Surge.Testing; using Microsoft.Extensions.DependencyInjection; @@ -27,6 +28,15 @@ public async Task ReadsAllEventsFromDefaultIndex(bool forwards) { await ValidateRead(SystemStreams.DefaultSecondaryIndex, Fixture.AppendedBatches.ToDefaultIndexResolvedEvents(), forwards); } + [Fact] + public async Task ReadFromDefaultIndexUsingQueryEngine() { + var engine = Fixture.NodeServices.GetRequiredService(); + using var preparedSql = engine.PrepareQuery("SELECT metadata FROM kdb.records"u8, digitallySign: true); + + var consumer = new RowCountReader(); + await engine.ExecuteAsync(preparedSql.Memory, consumer, checkIntegrity: true, TestContext.Current.CancellationToken); + } + [Theory] [InlineData(true)] [InlineData(false)] @@ -57,7 +67,6 @@ public async Task ReadFromUnknownIndexFails() { public enum CommitMode { None, CommitAndClear, - CommitAndKeep, } private static readonly IEventFilter UserEventsFilter = new EventFilter.DefaultAllFilterStrategy.NonSystemStreamStrategy(); @@ -65,7 +74,6 @@ public enum CommitMode { [Theory] [InlineData(CommitMode.None)] [InlineData(CommitMode.CommitAndClear)] - [InlineData(CommitMode.CommitAndKeep)] public async Task ReadFromBothDefault(CommitMode mode) { await ValidateRead(mode, SystemStreams.DefaultSecondaryIndex, UserEventsFilter); } @@ -73,7 +81,6 @@ public async Task ReadFromBothDefault(CommitMode mode) { [Theory] [InlineData(CommitMode.None)] [InlineData(CommitMode.CommitAndClear)] - [InlineData(CommitMode.CommitAndKeep)] public async Task ReadFromBothCategory(CommitMode mode) { var category = Fixture.Categories.First(); await ValidateRead(mode, CategoryIndex.Name(category), new CategoryFilter(category)); @@ -82,7 +89,6 @@ public async Task ReadFromBothCategory(CommitMode mode) { [Theory] [InlineData(CommitMode.None)] [InlineData(CommitMode.CommitAndClear)] - [InlineData(CommitMode.CommitAndKeep)] public async Task ReadFromBothEventType(CommitMode mode) { var eventType = Fixture.EventTypes.First(); await ValidateRead(mode, EventTypeIndex.Name(eventType), new EventTypeFilter(eventType)); @@ -112,10 +118,7 @@ private async Task ValidateRead(CommitMode mode, string indexName, IEventFilter var processor = Fixture.NodeServices.GetRequiredService(); switch (mode) { case CommitMode.CommitAndClear: - processor.Commit(true); - break; - case CommitMode.CommitAndKeep: - processor.Commit(false); + processor.Commit(); break; } @@ -141,4 +144,30 @@ private class CategoryFilter(string category) : IEventFilter { private class EventTypeFilter(string eventType) : IEventFilter { public bool IsEventAllowed(EventRecord eventRecord) => eventRecord.EventType == eventType; } + + private sealed class RowCountReader : IQueryResultConsumer { + public long RowCount { + get; + private set; + } + + public ValueTask ConsumeAsync(IQueryResultReader resultReader, CancellationToken token) { + var task = ValueTask.CompletedTask; + try { + while (resultReader.TryRead()) { + RowCount += resultReader.Chunk.RowCount; + } + } catch (OperationCanceledException e) when (e.CancellationToken == token) { + task = ValueTask.FromCanceled(token); + } catch (Exception e) { + task = ValueTask.FromException(e); + } + + return task; + } + + public void Bind(TBinder binder) where TBinder : IPreparedQueryBinder, allows ref struct { + // nothing to bind, query has no substitution + } + } } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs index 32767e0cc1b..4c163873e59 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategoryIndexReader.cs @@ -1,6 +1,7 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). +using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; using KurrentDB.Core.Data; using KurrentDB.Core.Services.Storage.ReaderIndex; @@ -15,29 +16,58 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Category; internal class CategoryIndexReader( DuckDBConnectionPool sharedPool, DefaultIndexProcessor processor, - IReadIndex index, - DefaultIndexInFlightRecords inFlightRecords) + IReadIndex index) : SecondaryIndexReaderBase(sharedPool, index) { protected override string GetId(string indexName) => CategoryIndex.TryParseCategoryName(indexName, out var categoryName) ? categoryName : string.Empty; - protected override (List, bool) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, r => r.Category == id); + protected override List GetDbRecordsForwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection.ExecuteQuery(new(id!, startPosition, + maxCount)) + .CopyTo(records); + } else { + connection.ExecuteQuery(new(id!, startPosition, + maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(id!, startPosition, endPosition, maxCount)) - : db.QueryToList(new(id!, startPosition, endPosition, maxCount)); + return records; + } - protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, r => r.Category == id); + protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection.ExecuteQuery(new(id!, + startPosition, maxCount)) + .CopyTo(records); + } else { + connection.ExecuteQuery(new(id!, + startPosition, maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(id!, startPosition, 0, maxCount)) - : db.QueryToList(new(id!, startPosition, 0, maxCount)); + return records; + } public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition; diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs index 560b9a44216..108d7c9be58 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs @@ -15,12 +15,11 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar => new(statement) { args.Category, args.StartPosition, - args.EndPosition, args.Count }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where category=$1 and log_position>$2 and log_position<$3 order by rowid limit $4"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position>$2 order by coalesce(commit_position, log_position) limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -33,12 +32,11 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar => new(statement) { args.Category, args.StartPosition, - args.EndPosition, args.Count }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where category=$1 and log_position>=$2 and log_position<$3 order by rowid limit $4"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position>=$2 order by coalesce(commit_position, log_position) limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -55,7 +53,7 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where category=$1 and log_position<$2 order by rowid desc limit $3"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position<$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -72,10 +70,10 @@ public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, Prepar }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where category=$1 and log_position<=$2 order by rowid desc limit $3"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where category=$1 and log_position<=$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } - public record struct CategoryIndexQueryArgs(string Category, long StartPosition, long EndPosition, int Count); + public record struct CategoryIndexQueryArgs(string Category, long StartPosition, int Count); } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs deleted file mode 100644 index bb543918993..00000000000 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexInFlightRecords.cs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. -// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). - -using KurrentDB.SecondaryIndexing.Storage; - -namespace KurrentDB.SecondaryIndexing.Indexes.Default; - -internal record struct InFlightRecord( - long LogPosition, - long CommitPosition, - string Category, - string EventType, - string StreamName, - long EventNumber, - long Created -); - -internal class DefaultIndexInFlightRecords(SecondaryIndexingPluginOptions options) { - private readonly InFlightRecord[] _records = new InFlightRecord[options.CommitBatchSize]; - - private uint _version; // used for optimistic lock - private int _count; - - public int Count => _count; - - public void Append(long logPosition, long commitPosition, string category, string eventType, string stream, long eventNumber, long created) { - var count = _count; - _records[count] = new( - LogPosition: logPosition, - CommitPosition: commitPosition, - Category: category, - EventType: eventType, - StreamName: stream, - EventNumber: eventNumber, - Created: created); - - // Fence: make sure that the array modification cannot be done after the increment - Volatile.Write(ref _count, count + 1); - } - - public void Clear() { - Interlocked.Increment(ref _version); // full fence - - // Fence: make sure that the count is modified after the version - _count = 0; - } - - // read is protected by optimistic lock - private bool TryRead(uint currentVer, int index, out InFlightRecord record) { - record = _records[index]; - - // ensure that the record is copied before the comparison - Interlocked.MemoryBarrier(); - return currentVer == _version; - } - - public (List, bool) GetInFlightRecordsForwards( - long startPosition, - int maxCount, - bool excludeFirst, - Func? query = null) { - query ??= True; // to avoid branching in the loop - - var isComplete = false; - bool first = true; - - var currentVer = _version; - var result = new List(); - for (int i = 0, count = Volatile.Read(in _count), remaining = maxCount; - i < count && remaining > 0 && TryRead(currentVer, i, out var current); - i++) { - if (current.LogPosition >= startPosition) { - if (i == 0 && current.LogPosition == startPosition) { - isComplete = true; - } - if (query(current)) { - if (first && excludeFirst && current.LogPosition == startPosition) { - first = false; - continue; - } - - remaining--; - result.Add(new(current.LogPosition, current.CommitPosition, current.EventNumber)); - } - } else { - if (i == 0) { - isComplete = true; - } - } - } - - return (result, isComplete); - } - - public IEnumerable GetInFlightRecordsBackwards( - long startPosition, - int maxCount, - bool excludeFirst, - Func? query = null) { - query ??= True; // to avoid branching in the loop - - var count = Volatile.Read(in _count); - var currentVer = _version; - bool first = true; - - if (count > 0 - && TryRead(currentVer, 0, out var current) - && current.LogPosition <= startPosition) { - for (int i = count - 1, remaining = maxCount; - i >= 0 && remaining > 0 && TryRead(currentVer, i, out current); - i--) { - if (current.LogPosition <= startPosition && query(current)) { - if (first && excludeFirst && current.LogPosition == startPosition) { - first = false; - continue; - } - - remaining--; - yield return new(current.LogPosition, current.CommitPosition, current.EventNumber); - } - } - } - } - - private static bool True(InFlightRecord record) => true; - - public IEnumerable GetInFlightRecords() { - var currentVer = _version; - for (int i = 0, count = Volatile.Read(in _count); - i < count && TryRead(currentVer, i, out var current); - i++) { - yield return current; - } - } -} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs index fdbfbe9e94b..afc3c616462 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs @@ -4,9 +4,11 @@ using System.Diagnostics.Metrics; using DotNext; using DotNext.Threading; +using DuckDB.NET.Data; using Google.Protobuf.WellKnownTypes; using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; using KurrentDB.Common.Configuration; using KurrentDB.Core.Bus; using KurrentDB.Core.Data; @@ -17,6 +19,7 @@ using KurrentDB.SecondaryIndexing.Diagnostics; using KurrentDB.SecondaryIndexing.Indexes.Category; using KurrentDB.SecondaryIndexing.Indexes.EventType; +using KurrentDB.SecondaryIndexing.Storage; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using static KurrentDB.SecondaryIndexing.Indexes.Default.DefaultSql; @@ -24,19 +27,20 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Default; internal class DefaultIndexProcessor : Disposable, ISecondaryIndexProcessor { - private readonly DefaultIndexInFlightRecords _inFlightRecords; private readonly DuckDBAdvancedConnection _connection; private readonly IPublisher _publisher; private readonly ILongHasher _hasher; private readonly ILogger _log; + private readonly BufferedView _appender; + private Atomic _lastPosition; - private Appender _appender; - - public TFPos LastIndexedPosition { get; private set; } + public TFPos LastIndexedPosition { + get => _lastPosition.Value; + private set => _lastPosition.Value = value; + } public DefaultIndexProcessor( DuckDBConnectionPool db, - DefaultIndexInFlightRecords inFlightRecords, IPublisher publisher, ILongHasher hasher, [FromKeyedServices(SecondaryIndexingConstants.InjectionKey)] @@ -47,8 +51,7 @@ public DefaultIndexProcessor( ) { _connection = db.Open(); _log = loggerFactory.CreateLogger(); - _appender = new(_connection, "idx_all"u8); - _inFlightRecords = inFlightRecords; + _appender = new(_connection, "idx_all", "log_position", DefaultIndexViewName); var serviceName = metricsConfiguration?.ServiceName ?? "kurrentdb"; Tracker = new("default", serviceName, meter, clock ?? TimeProvider.System, loggerFactory.CreateLogger()); @@ -89,7 +92,7 @@ public bool TryIndex(ResolvedEvent resolvedEvent) { var created = new DateTimeOffset(resolvedEvent.Event.TimeStamp).ToUnixTimeMilliseconds(); using (var row = _appender.CreateRow()) { row.Add(logPosition); - if (commitPosition.HasValue && logPosition != commitPosition) + if (commitPosition.HasValue && logPosition != commitPosition.GetValueOrDefault()) row.Add(commitPosition.Value); else row.Add(DBNull.Value); @@ -110,8 +113,6 @@ public bool TryIndex(ResolvedEvent resolvedEvent) { row.Add(schemaFormat); } - _inFlightRecords.Append(logPosition, commitPosition ?? logPosition, category, schemaName, resolvedEvent.Event.EventStreamId, - eventNumber, created); LastIndexedPosition = resolvedEvent.EventPosition!.Value; _publisher.Publish(new StorageMessage.SecondaryIndexCommitted(SystemStreams.DefaultSecondaryIndex, resolvedEvent)); @@ -140,13 +141,10 @@ static string GetStreamCategory(string streamName) { private Atomic.Boolean _committing; - public void Commit() => Commit(true); - /// /// Commits all in-flight records to the index. /// - /// Tells you whether to clear the in-flight records after committing. It must be true and only set to false in tests. - internal void Commit(bool clearInflight) { + public void Commit() { if (IsDisposingOrDisposed || !_committing.FalseToTrue()) return; @@ -154,21 +152,20 @@ internal void Commit(bool clearInflight) { using var duration = Tracker.StartCommitDuration(); _appender.Flush(); } catch (Exception e) { - _log.LogError(e, "Failed to commit {Count} records to index at log position {LogPosition}", - _inFlightRecords.Count, LastIndexedPosition); + _log.LogError(e, "Failed to commit records to index at log position {LogPosition}", LastIndexedPosition); throw; } finally { _committing.TrueToFalse(); } - - if (clearInflight) { - _inFlightRecords.Clear(); - } } + public BufferedView.Snapshot CaptureSnapshot(DuckDBConnection connection) + => _appender.TakeSnapshot(connection, ExpandRecordFunction.UnnestExpression); + protected override void Dispose(bool disposing) { if (disposing) { Commit(); + _appender.Unregister(_connection); _appender.Dispose(); _connection.Dispose(); } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs index 2e7fca3d6d9..4751c589027 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexReader.cs @@ -1,6 +1,7 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). +using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; using KurrentDB.Core.Data; using KurrentDB.Core.Services; @@ -13,26 +14,57 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Default; internal class DefaultIndexReader( DuckDBConnectionPool sharedPool, DefaultIndexProcessor processor, - DefaultIndexInFlightRecords inFlightRecords, IReadIndex index ) : SecondaryIndexReaderBase(sharedPool, index) { protected override string GetId(string indexName) => string.Empty; - protected override (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst); + protected override List GetDbRecordsForwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection.ExecuteQuery(new( + startPosition, + maxCount)) + .CopyTo(records); + } else { + connection.ExecuteQuery(new( + startPosition, + maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(startPosition, endPosition, maxCount)) - : db.QueryToList(new(startPosition, endPosition, maxCount)); + return records; + } - protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst); + protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection.ExecuteQuery( + new(startPosition, maxCount)) + .CopyTo(records); + } else { + connection.ExecuteQuery( + new(startPosition, maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(startPosition, 0, maxCount)) - : db.QueryToList(new(startPosition, 0, maxCount)); + return records; + } public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition; diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs index 87b1fd5c0b6..96033fc4b27 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs @@ -7,17 +7,24 @@ namespace KurrentDB.SecondaryIndexing.Indexes.Default; internal static class DefaultSql { - public record struct ReadDefaultIndexQueryArgs(long StartPosition, long EndPosition, int Count); + // We should query on this view rather than on idx_all table. + // The view combines in-flight cache and idx_all rows. + public const string DefaultIndexViewName = "idx_all_snapshot"; + + public record struct ReadDefaultIndexQueryArgs(long StartPosition, int Count); /// /// Get index records for the default index with a log position greater than the start position /// public struct ReadDefaultIndexQueryExcl : IQuery { public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement) - => new(statement) { args.StartPosition, args.EndPosition, args.Count }; + => new(statement) { args.StartPosition, args.Count }; + // We cannot sort by 'rowid' because this hidden column exists only for rows in the physical table. + // Moreover, DuckDB cannot incrementally return query result by pages (data chunks), because ordering must be applied + // first on all rows, which leads to full materialization of the query result in the memory. public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where log_position>$1 and log_position<$2 and is_deleted=false order by rowid limit $3"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where log_position>$1 and is_deleted=false order by coalesce(commit_position, log_position) limit $2"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -27,10 +34,10 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre /// public struct ReadDefaultIndexQueryIncl : IQuery { public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement) - => new(statement) { args.StartPosition, args.EndPosition, args.Count }; + => new(statement) { args.StartPosition, args.Count }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where log_position>=$1 and log_position<$2 and is_deleted=false order by rowid limit $3"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where log_position>=$1 and is_deleted=false order by coalesce(commit_position, log_position) limit $2"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -43,7 +50,7 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre => new(statement) { args.StartPosition, args.Count }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where log_position<$1 and is_deleted=false order by rowid desc limit $2"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where log_position<$1 and is_deleted=false order by coalesce(commit_position, log_position) desc, log_position desc limit $2"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -56,7 +63,7 @@ public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, Pre => new(statement) { args.StartPosition, args.Count }; public static ReadOnlySpan CommandText => - "select log_position, commit_position, event_number from idx_all where log_position<=$1 and is_deleted=false order by rowid desc limit $2"u8; + "select log_position, commit_position, event_number from idx_all_snapshot where log_position<=$1 and is_deleted=false order by coalesce(commit_position, log_position) desc, log_position desc limit $2"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs new file mode 100644 index 00000000000..8e5bddf69da --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Indexes/Default/ExpandRecordFunction.cs @@ -0,0 +1,68 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Security.Claims; +using DuckDB.NET.Native; +using Kurrent.Quack; +using Kurrent.Quack.Functions; +using KurrentDB.Core.Data; +using KurrentDB.Core.Services.Transport.Enumerators; + +namespace KurrentDB.SecondaryIndexing.Indexes.Default; + +internal sealed class ExpandRecordFunction(Func> eventsProvider) + : GetDatabaseEventsFunction(Name, eventsProvider) { + + private new const string Name = "get_kdb_def"; + public static ReadOnlySpan UnnestExpression => "unnest(get_kdb_def(log_position))"u8; + + protected override void Bind(BindingContext context) { + // nothing to initialize here + } + + protected override void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex) { + // Data column + var column = builder[0]; + if (ev.IsJson) { + // JSON can be copied to DuckDB directly because it's encoded as UTF-8 + column.SetValue(rowIndex, ev.Data.Span); + } else { + WriteBase64(column, rowIndex, ev.Data.Span); + } + + // Metadata column + column = builder[1]; + column.SetValue(rowIndex, ev.Metadata.Span is { Length: > 0 } metadata ? metadata : "{}"u8); + + // StreamId column + column = builder[2]; + column.SetValue(rowIndex, ev.EventStreamId); + } + + protected override void FillRowWithEmptyData(ref TBuilder builder, int rowIndex) { + // Data column + var column = builder[0]; + column.SetValue(rowIndex, "{}"u8); + + // Metadata column + column = builder[1]; + column.SetValue(rowIndex, "{}"u8); + + // StreamId column + column = builder[2]; + column.SetValue(rowIndex, string.Empty); + } +} + + +internal readonly ref struct EventColumns : ICompositeReturnType { + private const DuckDBType Data = DuckDBType.Varchar; + private const DuckDBType Metadata = DuckDBType.Varchar; + private const DuckDBType StreamId = DuckDBType.Varchar; + + static IReadOnlyList> ICompositeReturnType.ReturnType => new ICompositeReturnType.Builder { + { Data, "data" }, + { Metadata, "metadata" }, + { StreamId, "stream_id" }, + }; +} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs index c10d130ed5e..0dc58df9bda 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeIndexReader.cs @@ -1,6 +1,7 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). +using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; using KurrentDB.Core.Data; using KurrentDB.Core.Services.Storage.ReaderIndex; @@ -13,27 +14,58 @@ namespace KurrentDB.SecondaryIndexing.Indexes.EventType; internal class EventTypeIndexReader( DuckDBConnectionPool sharedPool, DefaultIndexProcessor processor, - IReadIndex index, - DefaultIndexInFlightRecords inFlightRecords) + IReadIndex index) : SecondaryIndexReaderBase(sharedPool, index) { protected override string GetId(string streamName) => EventTypeIndex.TryParseEventType(streamName, out var eventTypeName) ? eventTypeName : string.Empty; - protected override (List, bool) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, r => r.EventType == id); + protected override List GetDbRecordsForwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection + .ExecuteQuery(new(id!, startPosition, maxCount)) + .CopyTo(records); + } else { + connection + .ExecuteQuery(new(id!, startPosition, + maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(id!, startPosition, endPosition, maxCount)) - : db.QueryToList(new(id!, startPosition, endPosition, maxCount)); + return records; + } - protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst) - => inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, r => r.EventType == id); + protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + if (excludeFirst) { + connection + .ExecuteQuery(new(id!, startPosition, maxCount)) + .CopyTo(records); + } else { + connection + .ExecuteQuery(new(id!, + startPosition, maxCount)) + .CopyTo(records); + } + } + } - protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) - => excludeFirst - ? db.QueryToList(new(id!, startPosition, 0, maxCount)) - : db.QueryToList(new(id!, startPosition, 0, maxCount)); + return records; + } public override TFPos GetLastIndexedPosition(string indexName) => processor.LastIndexedPosition; diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs index 0740c70bc69..fa5df1f722b 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs @@ -7,7 +7,7 @@ namespace KurrentDB.SecondaryIndexing.Indexes.EventType; internal static class EventTypeSql { - public record struct ReadEventTypeIndexQueryArgs(string EventType, long StartPosition, long EndPosition, int Count); + public record struct ReadEventTypeIndexQueryArgs(string EventType, long StartPosition, int Count); /// /// Get index records for a given event type where the log position is greater than the start position @@ -17,12 +17,11 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P => new(statement) { args.EventType, args.StartPosition, - args.EndPosition, args.Count }; public static ReadOnlySpan CommandText - => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position>$2 and log_position<$3 order by rowid limit $4"u8; + => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position>$2 order by coalesce(commit_position, log_position) limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -35,12 +34,11 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P => new(statement) { args.EventType, args.StartPosition, - args.EndPosition, args.Count }; public static ReadOnlySpan CommandText - => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position>=$2 and log_position<$3 order by rowid limit $4"u8; + => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position>=$2 order by coalesce(commit_position, log_position) limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -57,7 +55,7 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P }; public static ReadOnlySpan CommandText - => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position<$2 order by rowid desc limit $3"u8; + => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position<$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } @@ -74,7 +72,7 @@ public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, P }; public static ReadOnlySpan CommandText - => "select log_position, commit_position, event_number from idx_all where event_type=$1 and log_position<=$2 order by rowid limit $3"u8; + => "select log_position, commit_position, event_number from idx_all_snapshot where event_type=$1 and log_position<=$2 order by coalesce(commit_position, log_position) desc, log_position desc limit $3"u8; public static IndexQueryRecord Parse(ref DataChunk.Row row) => new(row.ReadInt64(), row.TryReadInt64(), row.ReadInt64()); } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs new file mode 100644 index 00000000000..072f6be619c --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Indexes/GetDatabaseEventsFunction.cs @@ -0,0 +1,59 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Runtime.CompilerServices; +using System.Security.Claims; +using DotNext.Buffers; +using DotNext.Buffers.Text; +using DuckDB.NET.Native; +using Kurrent.Quack; +using Kurrent.Quack.Functions; +using KurrentDB.Core.Data; +using KurrentDB.Core.Services.Transport.Enumerators; +using KurrentDB.Core.Services.UserManagement; + +namespace KurrentDB.SecondaryIndexing.Indexes; + +internal abstract class GetDatabaseEventsFunction(string functionName, Func> eventsProvider) : ScalarFunction(functionName) + where TReturnType : ICompositeReturnType, allows ref struct +{ + // Accepts log_position + protected sealed override IReadOnlyList Parameters => [new(DuckDBType.BigInt)]; + + protected sealed override void Execute(ExecutionContext context, in DataChunk input, ref TBuilder builder) { + var logPositions = input[0].Int64Rows.ToArray(); // TODO: Remove array allocation + + using var enumerator = eventsProvider.Invoke(logPositions, SystemAccounts.System); + + for (var rowIndex = 0; enumerator.MoveNext(); rowIndex++) { + if (enumerator.Current is ReadResponse.EventReceived eventReceived) { + FillRow(eventReceived.Event.Event, ref builder, rowIndex); + } else { + // We should not leave the builder with uninitialized rows to avoid memory garbage to leak into DuckDB internals + FillRowWithEmptyData(ref builder, rowIndex); + } + } + } + + protected abstract void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex) + where TBuilder : struct, DataChunk.IBuilder; + + protected abstract void FillRowWithEmptyData(ref TBuilder builder, int rowIndex) + where TBuilder : struct, DataChunk.IBuilder; + + [MethodImpl(MethodImplOptions.NoInlining)] + protected static void WriteBase64(DataChunk.ColumnBuilder column, int rowIndex, ReadOnlySpan data) { + const byte quote = (byte)'"'; + + var writer = new BufferWriterSlim(4096); + writer.Add(quote); + var encoder = new Base64Encoder(); + try { + encoder.EncodeToUtf8(data, ref writer, flush: true); + writer.Add(quote); + column.SetValue(rowIndex, writer.WrittenSpan); + } finally { + writer.Dispose(); + } + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs b/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs index 67305971be6..0a92762af00 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/SecondaryIndexReaderBase.cs @@ -13,11 +13,7 @@ namespace KurrentDB.SecondaryIndexing.Indexes; public abstract class SecondaryIndexReaderBase(DuckDBConnectionPool sharedPool, IReadIndex index) : ISecondaryIndexReader { protected abstract string? GetId(string indexName); - protected abstract (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst); - - protected abstract List GetDbRecordsForwards(DuckDBConnectionPool pool, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst); - - protected abstract IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst); + protected abstract List GetDbRecordsForwards(DuckDBConnectionPool pool, string? id, long startPosition, int maxCount, bool excludeFirst); protected abstract List GetDbRecordsBackwards(DuckDBConnectionPool pool, string? id, long startPosition, int maxCount, bool excludeFirst); @@ -60,28 +56,14 @@ CancellationToken token ReadIndexEventsForwardCompleted NoData(ReadIndexResult result, bool endOfStream, string? error = null) => new(result, ResolvedEvent.EmptyArray, pos, lastIndexedPosition, endOfStream, error); - IReadOnlyList GetIndexRecordsForwards(long startPosition) { - var maxCount = msg.MaxCount; - var (inFlight, isComplete) = GetInflightForwards(id, startPosition, maxCount, msg.ExcludeStart); - if (isComplete) { - return inFlight; - } - - var end = inFlight.Count > 0 ? inFlight[0].LogPosition : lastIndexedPosition + 1; - var range = GetDbRecordsForwards(GetPool(msg.Pool), id, startPosition, end, maxCount, msg.ExcludeStart); - if (range.Count == 0) { - return inFlight; - } - - if (inFlight.Count > 0) { - range.AddRange(inFlight); - } - - return range; - } - async ValueTask<(long, IReadOnlyList)> GetEventsForwards(long startPosition) { - var indexPrepares = GetIndexRecordsForwards(startPosition); + var indexPrepares = GetDbRecordsForwards( + GetPool(msg.Pool), + id, + startPosition, + msg.MaxCount, + msg.ExcludeStart); + var events = await reader.ReadRecords(indexPrepares, true, token); return (indexPrepares.Count, events); } @@ -118,45 +100,19 @@ CancellationToken token ReadIndexEventsBackwardCompleted NoData(ReadIndexResult result, string? error = null) => new(result, ResolvedEvent.EmptyArray, pos, lastIndexedPosition, false, error); - IReadOnlyList GetIndexRecordsBackwards(TFPos startPosition) { - var maxCount = msg.MaxCount; - var inFlight = GetInflightBackwards(id, startPosition.PreparePosition, maxCount, msg.ExcludeStart).ToArray(); - if (inFlight.Length == maxCount) { - return inFlight; - } - - int count; - long start; - bool excl; - if (inFlight.Length > 0) { - count = maxCount - inFlight.Length; - start = inFlight[^1].LogPosition; - excl = true; - } else { - count = maxCount; - start = startPosition.PreparePosition; - excl = msg.ExcludeStart; - } - var range = GetDbRecordsBackwards(GetPool(msg.Pool), id, start, count, excl); - if (range.Count == 0) { - return inFlight; - } - - if (inFlight.Length > 0) { - range.AddRange(inFlight); - } - - return range; - } - async ValueTask<(long, IReadOnlyList)> GetEventsBackwards(TFPos startPosition) { - var indexPrepares = GetIndexRecordsBackwards(startPosition); + var indexPrepares = GetDbRecordsBackwards(GetPool(msg.Pool), + id, + startPosition.PreparePosition, + msg.MaxCount, + msg.ExcludeStart); + var events = await reader.ReadRecords(indexPrepares, false, token); return (indexPrepares.Count, events); } } private DuckDBConnectionPool GetPool(DuckDBConnectionPool? pool) { - return pool is not null ? pool : sharedPool; + return pool ?? sharedPool; } } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs new file mode 100644 index 00000000000..e4087326600 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/ExpandRecordFunction.cs @@ -0,0 +1,77 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Security.Claims; +using DuckDB.NET.Native; +using Kurrent.Quack; +using Kurrent.Quack.Functions; +using KurrentDB.Core.Data; +using KurrentDB.Core.Services.Transport.Enumerators; + +namespace KurrentDB.SecondaryIndexing.Indexes.User; + +internal sealed class ExpandRecordFunction(Func> eventsProvider) + : GetDatabaseEventsFunction(Name, eventsProvider) { + + private new const string Name = "get_kdb_usr"; + public static ReadOnlySpan UnnestExpression => "unnest(get_kdb_usr(log_position))"u8; + + protected override void Bind(BindingContext context) { + // nothing to initialize here + } + + protected override void FillRow(EventRecord ev, ref TBuilder builder, int rowIndex) { + // Data column + var column = builder[0]; + if (ev.IsJson) { + // JSON can be copied to DuckDB directly because it's encoded as UTF-8 + column.SetValue(rowIndex, ev.Data.Span); + } else { + WriteBase64(column, rowIndex, ev.Data.Span); + } + + // Metadata column + column = builder[1]; + column.SetValue(rowIndex, ev.Metadata.Span is { Length: > 0 } metadata ? metadata : "{}"u8); + + // StreamId column + column = builder[2]; + column.SetValue(rowIndex, ev.EventStreamId); + + // EventType column + column = builder[3]; + column.SetValue(rowIndex, ev.EventType); + } + + protected override void FillRowWithEmptyData(ref TBuilder builder, int rowIndex) { + // Data column + var column = builder[0]; + column.SetValue(rowIndex, "{}"u8); + + // Metadata column + column = builder[1]; + column.SetValue(rowIndex, "{}"u8); + + // StreamId column + column = builder[2]; + column.SetValue(rowIndex, string.Empty); + + // EventType column + column = builder[3]; + column.SetValue(rowIndex, string.Empty); + } +} + +internal readonly ref struct EventColumns : ICompositeReturnType { + private const DuckDBType Data = DuckDBType.Varchar; + private const DuckDBType Metadata = DuckDBType.Varchar; + private const DuckDBType StreamId = DuckDBType.Varchar; + private const DuckDBType EventType = DuckDBType.Varchar; + + static IReadOnlyList> ICompositeReturnType.ReturnType => new ICompositeReturnType.Builder { + { Data, "data" }, + { Metadata, "metadata" }, + { StreamId, "stream_id" }, + { EventType, "event_type" }, + }; +} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs index aad1dfa822b..03e58d68e12 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs @@ -2,118 +2,116 @@ // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). using System.Globalization; -using DuckDB.NET.Data.DataChunk.Writer; +using System.Numerics; +using DotNext.Patterns; using Jint; using Jint.Native; using Kurrent.Quack; +using Kurrent.Quack.Threading; namespace KurrentDB.SecondaryIndexing.Indexes.User; public interface IField { - static abstract IField ParseFrom(JsValue value); - static abstract IField ParseFrom(string value); static abstract string GetCreateStatement(string field); - static abstract Type? Type { get; } string GetQueryStatement(string field); void BindTo(PreparedStatement statement, ref int index); - void AppendTo(Appender.Row row); - void WriteTo(IDuckDBDataWriter writer, ulong rowIndex); + void AppendTo(BufferedAppender.Row row); } -internal readonly record struct Int16Field(short Key) : IField { - public static Type Type { get; } = typeof(short); - public static IField ParseFrom(JsValue value) => new Int16Field(Convert.ToInt16(value.AsNumber())); - public static IField ParseFrom(string value) => new Int16Field(Convert.ToInt16(value)); +public interface IField : IField + where TSelf : IField +{ + static abstract TSelf ParseFrom(JsValue value); + static abstract TSelf ParseFrom(string value); +} + +file interface INumericField : IField + where TSelf : struct, INumericField + where TNumber : struct, INumber { + static abstract TSelf Create(TNumber value); + + static TSelf IField.ParseFrom(JsValue value) => TSelf.Create(TNumber.CreateChecked(value.AsNumber())); + + static TSelf IField.ParseFrom(string value) => TSelf.Create(TNumber.Parse(value, provider: null)); +} + +internal readonly record struct Int16Field(short Key) : INumericField { + public static Int16Field Create(short value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" SMALLINT not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(); } -internal readonly record struct Int32Field(int Key) : IField { - public static Type Type { get; } = typeof(int); - public static IField ParseFrom(JsValue value) => new Int32Field(Convert.ToInt32(value.AsNumber())); - public static IField ParseFrom(string value) => new Int32Field(Convert.ToInt32(value)); +internal readonly record struct Int32Field(int Key) : INumericField { + public static Int32Field Create(int value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" INTEGER not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(); } -internal readonly record struct Int64Field(long Key) : IField { - public static Type Type { get; } = typeof(long); - public static IField ParseFrom(JsValue value) => new Int64Field(Convert.ToInt64(value.AsNumber())); - public static IField ParseFrom(string value) => new Int64Field(Convert.ToInt64(value)); +internal readonly record struct Int64Field(long Key) : INumericField { + public static Int64Field Create(long value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" BIGINT not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(); } -internal readonly record struct UInt32Field(uint Key) : IField { - public static Type Type { get; } = typeof(uint); - public static IField ParseFrom(JsValue value) => new UInt32Field(Convert.ToUInt32(value.AsNumber())); - public static IField ParseFrom(string value) => new UInt32Field(Convert.ToUInt32(value)); +internal readonly record struct UInt32Field(uint Key) : INumericField { + public static UInt32Field Create(uint value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" UINTEGER not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(); } -internal readonly record struct UInt64Field(ulong Key) : IField { - public static Type Type { get; } = typeof(ulong); - public static IField ParseFrom(JsValue value) => new UInt64Field(Convert.ToUInt64(value.AsNumber())); - public static IField ParseFrom(string value) => new UInt64Field(Convert.ToUInt64(value)); +internal readonly record struct UInt64Field(ulong Key) : INumericField { + public static UInt64Field Create(ulong value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" UBIGINT not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(); } -internal readonly record struct DoubleField(double Key) : IField { - public static Type Type { get; } = typeof(double); - public static IField ParseFrom(JsValue value) => new DoubleField(value.AsNumber()); - public static IField ParseFrom(string value) => new DoubleField(Convert.ToDouble(value)); +internal readonly record struct DoubleField(double Key) : INumericField { + public static DoubleField Create(double value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" DOUBLE not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key.ToString(CultureInfo.InvariantCulture); } -internal readonly record struct StringField(string Key) : IField { - public static Type Type { get; } = typeof(string); - public static IField ParseFrom(JsValue value) => new StringField(value.AsString()); - public static IField ParseFrom(string value) => new StringField(value); +internal readonly record struct StringField(string Key) : IField { + public static StringField ParseFrom(JsValue value) => ParseFrom(value.AsString()); + public static StringField ParseFrom(string value) => new(value); public static string GetCreateStatement(string field) => $", \"{field}\" VARCHAR not null"; public string GetQueryStatement(string field) => $"and \"{field}\" = ?"; public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key); - public void AppendTo(Appender.Row row) => row.Add(Key); - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex); + public void AppendTo(BufferedAppender.Row row) => row.Add(Key); public override string ToString() => Key; } -internal readonly record struct NullField : IField { - public static Type? Type { get => null; } - public static IField ParseFrom(JsValue value) { +internal sealed class NullField : IField, ISingleton { + public static NullField Instance { get; } = new(); + + private NullField() { + } + + public static NullField ParseFrom(JsValue value) { return !value.IsNull() ? throw new ArgumentException(null, nameof(value)) : new NullField(); } - public static IField ParseFrom(string value) => throw new NotSupportedException(); + public static NullField ParseFrom(string value) => throw new NotSupportedException(); public static string GetCreateStatement(string field) => string.Empty; public string GetQueryStatement(string field) => string.Empty; public void BindTo(PreparedStatement statement, ref int index) { } - public void AppendTo(Appender.Row row) { } - public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) { } + public void AppendTo(BufferedAppender.Row row) { } } diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs index 34746b194a0..8957d2a9365 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngine.cs @@ -2,7 +2,9 @@ // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). using System.Diagnostics.Metrics; +using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; using Kurrent.Surge.Schema.Serializers; using KurrentDB.Core; using KurrentDB.Core.Bus; @@ -58,6 +60,12 @@ public UserIndexEngine( subscriber.Subscribe(this); } + public bool TryCaptureSnapshot(ReadOnlySpan viewNameUtf8, + DuckDBAdvancedConnection connection, + out UserIndexEngineSubscription.ReadLock readLock, + out BufferedView.Snapshot snapshot) + => _subscription.TryCaptureSnapshot(viewNameUtf8, connection, out readLock, out snapshot); + public void EnsureLive() { if (!_subscription.CaughtUp) { throw new UserIndexesNotReadyException(_subscription.Checkpoint, _writerCheckpoint.Read()); @@ -136,8 +144,8 @@ private class Filter : IEventFilter { CancellationToken token) => _subscription.ReadBackwards(msg, token); - public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string inFlightTableName, out string? fieldName) => - _subscription.TryGetUserIndexTableDetails(indexName, out tableName, out inFlightTableName, out fieldName); + public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string? fieldName) => + _subscription.TryGetUserIndexTableDetails(indexName, out tableName, out fieldName); } static partial class UserIndexEngineLogMessages { diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs new file mode 100644 index 00000000000..c1d1990a7a7 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.ViewName.cs @@ -0,0 +1,23 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Diagnostics.CodeAnalysis; +using System.Text; + +namespace KurrentDB.SecondaryIndexing.Indexes.User; + +partial class UserIndexEngineSubscription { + private sealed class ViewNameEqualityComparer : IEqualityComparer, IAlternateEqualityComparer, ViewName> { + bool IEqualityComparer.Equals(ViewName x, ViewName y) => x.Equals(y); + + int IEqualityComparer.GetHashCode(ViewName obj) => obj.GetHashCode(); + + bool IAlternateEqualityComparer, ViewName>.Equals(ReadOnlySpan alternate, ViewName other) + => other.Value.SequenceEqual(alternate); + + int IAlternateEqualityComparer, ViewName>.GetHashCode(ReadOnlySpan alternate) + => ViewName.GetHashCode(alternate); + + ViewName IAlternateEqualityComparer, ViewName>.Create(ReadOnlySpan alternate) => new(alternate); + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs index 21a902dc37e..c851e4db14b 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexEngineSubscription.cs @@ -6,6 +6,7 @@ using System.Threading.Channels; using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; using Kurrent.Surge.Schema; using Kurrent.Surge.Schema.Serializers; using KurrentDB.Core; @@ -23,7 +24,7 @@ namespace KurrentDB.SecondaryIndexing.Indexes.User; -public class UserIndexEngineSubscription( +public partial class UserIndexEngineSubscription( ISystemClient client, IPublisher publisher, ISchemaSerializer serializer, @@ -37,13 +38,15 @@ public class UserIndexEngineSubscription( : ISecondaryIndexReader { private readonly ConcurrentDictionary _userIndexes = new(); + private readonly ConcurrentDictionary _viewNameToIndexNameMapping = new(new ViewNameEqualityComparer()); private readonly CancellationTokenSource _cts = CancellationTokenSource.CreateLinkedTokenSource(token); private readonly ILogger _log = logFactory.CreateLogger(); record struct UserIndexData( ReaderWriterLockSlim RWLock, UserIndexSubscription Subscription, - SecondaryIndexReaderBase Reader); + UserIndexReader Reader, + string ViewName); public bool CaughtUp { get; private set; } public long Checkpoint { get; private set; } @@ -186,7 +189,7 @@ await client.Subscriptions.SubscribeToStream( } private ValueTask StartUserIndex(string indexName, IndexCreated createdEvent) { - if (createdEvent.Fields.Count is 0) + if (createdEvent.Fields is []) return StartUserIndex(indexName, createdEvent); return createdEvent.Fields[0].Type switch { @@ -203,11 +206,9 @@ private ValueTask StartUserIndex(string indexName, IndexCreated createdEvent) { private async ValueTask StartUserIndex( string indexName, - IndexCreated createdEvent) where TField : IField { + IndexCreated createdEvent) where TField : IField { _log.LogStartingUserIndex(indexName); - var inFlightRecords = new UserIndexInFlightRecords(options); - var sql = new UserIndexSql( indexName, createdEvent.Fields.Count is 0 @@ -222,14 +223,13 @@ createdEvent.Fields.Count is 0 : createdEvent.Fields[0].Selector, db: db, sql: sql, - inFlightRecords: inFlightRecords, publisher: publisher, meter: meter, getLastAppendedRecord: getLastAppendedRecord, loggerFactory: logFactory ); - var reader = new UserIndexReader(sharedPool: db, sql, inFlightRecords, readIndex); + var reader = new UserIndexReader(sharedPool: db, processor, readIndex); UserIndexSubscription subscription = new UserIndexSubscription( publisher: publisher, @@ -238,18 +238,22 @@ createdEvent.Fields.Count is 0 log: logFactory.CreateLogger(), token: _cts.Token); - _userIndexes.TryAdd(indexName, new(new(), subscription, reader)); + _userIndexes.TryAdd(indexName, new(new(), subscription, reader, sql.ViewName)); + var viewName = new ViewName(sql.ViewName); + _viewNameToIndexNameMapping.TryAdd(viewName, indexName); await subscription.Start(); } private async Task StopUserIndex(string indexName) { _log.LogStoppingUserIndex(LogLevel.Debug, indexName); - var writeLock = AcquireWriteLockForIndex(indexName, out var index); + var writeLock = AcquireWriteLockForIndex(indexName, out var viewName, out var index); using (writeLock) { // we have the write lock, there are no readers. after we release the lock the index is no longer // in the dictionary so there can be no new readers. _userIndexes.TryRemove(indexName, out _); + var name = new ViewName(viewName); + _viewNameToIndexNameMapping.TryRemove(name, out _); } // guaranteed no readers, we can stop. @@ -322,39 +326,55 @@ public TFPos GetLastIndexedPosition(string indexStream) { return data.Reader.ReadBackwards(msg, token); } - public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string inFlightTableName, out string? fieldName) { + public bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string? fieldName) { if (!TryAcquireReadLockForIndex(indexName, out var readLock, out var data)) { tableName = null!; - inFlightTableName = null!; fieldName = null; return false; } using (readLock) { - data.Subscription.GetUserIndexTableDetails(out tableName, out inFlightTableName, out fieldName); + data.Subscription.GetUserIndexTableDetails(out tableName, out fieldName); + return true; + } + } + + public bool TryCaptureSnapshot(ReadOnlySpan viewNameUtf8, + DuckDBAdvancedConnection connection, + out ReadLock readLock, + out BufferedView.Snapshot snapshot) { + var view = _viewNameToIndexNameMapping.GetAlternateLookup>(); + if (view.TryGetValue(viewNameUtf8, out var indexName) + && TryAcquireReadLockForIndex(indexName, out readLock, out var data)) { + snapshot = data.Reader.CaptureSnapshot(connection); return true; } + + snapshot = default; + readLock = default; + return false; } - private bool TryAcquireReadLockForIndex(string index, out ReadLock? readLock, out UserIndexData data) { + private bool TryAcquireReadLockForIndex(ReadOnlySpan index, out ReadLock readLock, out UserIndexData data) { // note: a write lock is acquired only when deleting the index. so, if we cannot acquire a read lock, // it means that the user index is being/has been deleted. - readLock = null; + readLock = default; data = default; - if (!_userIndexes.TryGetValue(index, out data)) { + var view = _userIndexes.GetAlternateLookup>(); + if (!view.TryGetValue(index, out data)) { return false; } if (!data.RWLock.TryEnterReadLock(TimeSpan.Zero)) return false; - readLock = new ReadLock(data.RWLock); + readLock = new(data.RWLock); return true; } - private WriteLock AcquireWriteLockForIndex(string index, out UserIndexSubscription subscription) { + private WriteLock AcquireWriteLockForIndex(string index, out string viewName, out UserIndexSubscription subscription) { if (!_userIndexes.TryGetValue(index, out var data)) throw new Exception($"Failed to acquire write lock for index: {index}"); @@ -362,11 +382,12 @@ private WriteLock AcquireWriteLockForIndex(string index, out UserIndexSubscripti throw new Exception($"Timed out when acquiring write lock for index: {index}"); subscription = data.Subscription; + viewName = data.ViewName; return new(data.RWLock); } - private readonly record struct ReadLock(ReaderWriterLockSlim Lock) : IDisposable { - public void Dispose() => Lock.ExitReadLock(); + public readonly record struct ReadLock(ReaderWriterLockSlim Lock) : IDisposable { + public void Dispose() => Lock?.ExitReadLock(); } private readonly record struct WriteLock(ReaderWriterLockSlim Lock) : IDisposable { diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs deleted file mode 100644 index 06eceaf6b06..00000000000 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexInFlightRecords.cs +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. -// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). - -using KurrentDB.SecondaryIndexing.Storage; - -namespace KurrentDB.SecondaryIndexing.Indexes.User; - -internal record struct UserIndexInFlightRecord( - long LogPosition, - long CommitPosition, - long EventNumber, - long Created, - TField? Field -); - -internal class UserIndexInFlightRecords(SecondaryIndexingPluginOptions options) { - private readonly UserIndexInFlightRecord[] _records = new UserIndexInFlightRecord[options.CommitBatchSize]; - - private uint _version; // used for optimistic lock - private int _count; - - public int Count => _count; - - public void Append(long logPosition, long commitPosition, long eventNumber, TField? field, long created) { - var count = _count; - _records[count] = new() { - LogPosition = logPosition, - CommitPosition = commitPosition, - EventNumber = eventNumber, - Field = field, - Created = created - }; - - // Fence: make sure that the array modification cannot be done after the increment - Volatile.Write(ref _count, count + 1); - } - - public void Clear() { - Interlocked.Increment(ref _version); // full fence - - // Fence: make sure that the count is modified after the version - _count = 0; - } - - // read is protected by optimistic lock - private bool TryRead(uint currentVer, int index, out UserIndexInFlightRecord record) { - record = _records[index]; - - // ensure that the record is copied before the comparison - Interlocked.MemoryBarrier(); - return currentVer == _version; - } - - public (List, bool) GetInFlightRecordsForwards( - long startPosition, - int maxCount, - bool excludeFirst, - Func, bool>? query = null) { - query ??= True; // to avoid branching in the loop - - var isComplete = false; - bool first = true; - - var currentVer = _version; - var result = new List(); - for (int i = 0, count = Volatile.Read(in _count), remaining = maxCount; - i < count && remaining > 0 && TryRead(currentVer, i, out var current); - i++) { - if (current.LogPosition >= startPosition) { - if (i == 0 && current.LogPosition == startPosition) { - isComplete = true; - } - if (query(current)) { - if (first && excludeFirst && current.LogPosition == startPosition) { - first = false; - continue; - } - - remaining--; - result.Add(new(current.LogPosition, current.CommitPosition, current.EventNumber)); - } - } else { - if (i == 0) { - isComplete = true; - } - } - } - - return (result, isComplete); - } - - public IEnumerable GetInFlightRecordsBackwards( - long startPosition, - int maxCount, - bool excludeFirst, - Func, bool>? query = null) { - query ??= True; // to avoid branching in the loop - - var count = Volatile.Read(in _count); - var currentVer = _version; - bool first = true; - - if (count > 0 - && TryRead(currentVer, 0, out var current) - && current.LogPosition <= startPosition) { - for (int i = count - 1, remaining = maxCount; - i >= 0 && remaining > 0 && TryRead(currentVer, i, out current); - i--) { - if (current.LogPosition <= startPosition && query(current)) { - if (first && excludeFirst && current.LogPosition == startPosition) { - first = false; - continue; - } - - remaining--; - yield return new(current.LogPosition, current.CommitPosition, current.EventNumber); - } - } - } - } - - private static bool True(UserIndexInFlightRecord record) => true; - - public IEnumerable> GetInFlightRecords() { - var currentVer = _version; - for (int i = 0, count = Volatile.Read(in _count); - i < count && TryRead(currentVer, i, out var current); - i++) { - yield return current; - } - } -} diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs index cb3b5c90969..2e200df047a 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs @@ -4,12 +4,11 @@ using System.Diagnostics.Metrics; using DotNext; using DotNext.Threading; -using DuckDB.NET.Data; -using DuckDB.NET.Data.DataChunk.Writer; using Jint; using Jint.Native.Function; using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; using Kurrent.Surge.Schema.Serializers.Json; using KurrentDB.Common.Configuration; using KurrentDB.Core.Bus; @@ -28,15 +27,16 @@ internal abstract class UserIndexProcessor : Disposable, ISecondaryIndexProcesso public abstract bool TryIndex(ResolvedEvent evt); public abstract TFPos GetLastPosition(); public abstract SecondaryIndexProgressTracker Tracker { get; } + public abstract UserIndexSql Sql { get; } + public abstract BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection); } -internal class UserIndexProcessor : UserIndexProcessor where TField : IField { +internal class UserIndexProcessor : UserIndexProcessor + where TField : IField { private readonly Engine _engine = JintEngineFactory.CreateEngine(executionTimeout: TimeSpan.FromSeconds(30)); private readonly JsRecordEvaluator _evaluator; private readonly Function? _filter; private readonly Function? _fieldSelector; - private readonly UserIndexInFlightRecords _inFlightRecords; - private readonly string _inFlightTableName; private readonly string _queryStreamName; private readonly IPublisher _publisher; private readonly DuckDBAdvancedConnection _connection; @@ -45,13 +45,13 @@ internal class UserIndexProcessor : UserIndexProcessor where TField : IF private readonly ILogger _log; private ulong _sequenceId; - private TFPos _lastPosition; - private Appender _appender; + private Atomic _lastPosition; + private readonly BufferedView _appender; private Atomic.Boolean _committing; public string IndexName { get; } - public override TFPos GetLastPosition() => _lastPosition; + public override TFPos GetLastPosition() => _lastPosition.Value; public override SecondaryIndexProgressTracker Tracker { get; } public UserIndexProcessor( @@ -60,7 +60,6 @@ public UserIndexProcessor( string jsFieldSelector, DuckDBConnectionPool db, UserIndexSql sql, - UserIndexInFlightRecords inFlightRecords, IPublisher publisher, [FromKeyedServices(SecondaryIndexingConstants.InjectionKey)] Meter meter, @@ -72,8 +71,6 @@ public UserIndexProcessor( _sql = sql; _log = loggerFactory.CreateLogger(); - _inFlightRecords = inFlightRecords; - _inFlightTableName = UserIndexSql.GenerateInFlightTableNameFor(IndexName); _queryStreamName = UserIndexHelpers.GetQueryStreamName(IndexName); _publisher = publisher; @@ -90,29 +87,34 @@ public UserIndexProcessor( _connection = db.Open(); _sql.CreateUserIndex(_connection); - RegisterTableFunction(); - - _appender = new(_connection, _sql.TableNameUtf8.Span); + _appender = new(_connection, _sql.TableName, "log_position", _sql.ViewName); var serviceName = metricsConfiguration?.ServiceName ?? "kurrentdb"; var tracker = new SecondaryIndexProgressTracker(indexName, serviceName, meter, clock ?? TimeProvider.System, loggerFactory.CreateLogger(), getLastAppendedRecord); - (_lastPosition, var lastTimestamp) = GetLastKnownRecord(); - _log.LogUserIndexLoadedLastKnownLogPosition(IndexName, _lastPosition, lastTimestamp); - tracker.InitLastIndexed(_lastPosition.CommitPosition, lastTimestamp); + var (lastPosition, lastTimestamp) = GetLastKnownRecord(); + _log.LogUserIndexLoadedLastKnownLogPosition(IndexName, lastPosition, lastTimestamp); + tracker.InitLastIndexed(lastPosition.CommitPosition, lastTimestamp); + _lastPosition.Write(in lastPosition); Tracker = tracker; } + public override UserIndexSql Sql => _sql; + + public override BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection) + => _appender.TakeSnapshot(connection, ExpandRecordFunction.UnnestExpression); + public override bool TryIndex(ResolvedEvent resolvedEvent) { if (IsDisposingOrDisposed) return false; var canHandle = CanHandleEvent(resolvedEvent, out var field); - _lastPosition = resolvedEvent.OriginalPosition!.Value; + var eventPosition = resolvedEvent.OriginalPosition!.Value; if (!canHandle) { + _lastPosition.Write(in eventPosition); Tracker.RecordIndexed(resolvedEvent); return false; } @@ -139,7 +141,7 @@ public override bool TryIndex(ResolvedEvent resolvedEvent) { field?.AppendTo(row); } - _inFlightRecords.Append(preparePosition, commitPosition ?? preparePosition, eventNumber, field, created); + _lastPosition.Write(in eventPosition); _publisher.Publish(new StorageMessage.SecondaryIndexCommitted(_queryStreamName, resolvedEvent)); if (field is not null) @@ -167,7 +169,7 @@ bool CanHandleEvent(ResolvedEvent resolvedEvent, out TField? field) { if (_skip.Equals(fieldValue)) return false; - field = (TField)TField.ParseFrom(fieldValue); + field = TField.ParseFrom(fieldValue); return true; } catch (Exception ex) { @@ -213,67 +215,30 @@ public void Checkpoint(TFPos position, DateTime timestamp) { Created = new DateTimeOffset(timestamp).ToUnixTimeMilliseconds() }; - UserIndexSql.SetCheckpoint(_connection, checkpointArgs); + UserIndexSql.SetCheckpoint(_connection, checkpointArgs); } - public override void Commit() => Commit(true); - /// /// Commits all in-flight records to the index. /// - /// Tells you whether to clear the in-flight records after committing. It must be true and only set to false in tests. - private void Commit(bool clearInflight) { + public override void Commit() { if (IsDisposed || !_committing.FalseToTrue()) return; try { using var duration = Tracker.StartCommitDuration(); _appender.Flush(); - _log.LogUserIndexCommitted(IndexName, _inFlightRecords.Count); + _log.LogUserIndexCommitted(IndexName); } catch (Exception ex) { - _log.LogUserIndexFailedToCommit(ex, IndexName, _inFlightRecords.Count); + _log.LogUserIndexFailedToCommit(ex, IndexName); throw; } finally { _committing.TrueToFalse(); } - - if (clearInflight) { - _inFlightRecords.Clear(); - } - } - - private void RegisterTableFunction() { - _connection.RegisterTableFunction(_inFlightTableName, ResultCallback, MapperCallback); - - TableFunction ResultCallback() { - var records = _inFlightRecords.GetInFlightRecords(); - List columnInfos = [ - new("log_position", typeof(long)), - new("event_number", typeof(long)), - new("created", typeof(long)), - ]; - - if (TField.Type is { } type) - columnInfos.Add(new(_sql.FieldColumnName, type)); - - return new(columnInfos, records); - } - - void MapperCallback(object? item, IDuckDBDataWriter[] writers, ulong rowIndex) { - var record = (UserIndexInFlightRecord)item!; - writers[0].WriteValue(record.LogPosition, rowIndex); - writers[1].WriteValue(record.EventNumber, rowIndex); - writers[2].WriteValue(record.Created, rowIndex); - - if (TField.Type is not null) { - record.Field!.WriteTo(writers[3], rowIndex); - } - } } - public void GetUserIndexTableDetails(out string tableName, out string inFlightTableName, out string? fieldName) { + public void GetUserIndexTableDetails(out string tableName, out string? fieldName) { tableName = _sql.TableName; - inFlightTableName = _inFlightTableName; fieldName = _sql.FieldColumnName; } @@ -281,6 +246,7 @@ protected override void Dispose(bool disposing) { _log.LogStoppingUserIndexProcessor(IndexName); if (disposing) { Commit(); + _appender.Unregister(_connection); _appender.Dispose(); _connection.Dispose(); _engine.Dispose(); @@ -309,11 +275,11 @@ static partial class UserIndexProcessorLogMessages { [LoggerMessage(LogLevel.Trace, "User index: {index} is checkpointing at: {position} ({timestamp})")] internal static partial void LogUserIndexIsCheckpointing(this ILogger logger, string index, TFPos position, DateTime timestamp); - [LoggerMessage(LogLevel.Trace, "User index: {index} committed {count} records")] - internal static partial void LogUserIndexCommitted(this ILogger logger, string index, int count); + [LoggerMessage(LogLevel.Trace, "User index: {index} committed")] + internal static partial void LogUserIndexCommitted(this ILogger logger, string index); - [LoggerMessage(LogLevel.Error, "User index: {index} failed to commit {count} records")] - internal static partial void LogUserIndexFailedToCommit(this ILogger logger, Exception exception, string index, int count); + [LoggerMessage(LogLevel.Error, "User index: {index} failed to commit")] + internal static partial void LogUserIndexFailedToCommit(this ILogger logger, Exception exception, string index); [LoggerMessage(LogLevel.Trace, "Stopping user index processor for: {index}")] internal static partial void LogStoppingUserIndexProcessor(this ILogger logger, string index); diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs index 3d840eef331..3ec3d9c1e61 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexReader.cs @@ -1,19 +1,25 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). +using Kurrent.Quack; using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; using KurrentDB.Core.Data; using KurrentDB.Core.Services.Storage.ReaderIndex; using KurrentDB.SecondaryIndexing.Storage; namespace KurrentDB.SecondaryIndexing.Indexes.User; +internal abstract class UserIndexReader(DuckDBConnectionPool sharedPool, IReadIndex index) + : SecondaryIndexReaderBase(sharedPool, index) { + internal abstract BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection); +} + internal class UserIndexReader( DuckDBConnectionPool sharedPool, - UserIndexSql sql, - UserIndexInFlightRecords inFlightRecords, + UserIndexProcessor processor, IReadIndex index -) : SecondaryIndexReaderBase(sharedPool, index) where TField : IField { +) : UserIndexReader(sharedPool, index) where TField : IField { protected override string? GetId(string indexStream) { // the field is used as the ID. null when there is no field @@ -22,38 +28,32 @@ IReadIndex index return field; } - protected override (List Records, bool IsFinal) GetInflightForwards(string? id, long startPosition, int maxCount, bool excludeFirst) { - return TryGetField(id, out var field) - ? inFlightRecords.GetInFlightRecordsForwards(startPosition, maxCount, excludeFirst, Filter) - : ([], true); - - bool Filter(UserIndexInFlightRecord r) => id is null || EqualityComparer.Default.Equals(r.Field, field!); - } - - protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, long endPosition, int maxCount, bool excludeFirst) { + protected override List GetDbRecordsForwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) { if (!TryGetField(id, out var field)) return []; var args = new ReadUserIndexQueryArgs { StartPosition = startPosition, - EndPosition = endPosition, ExcludeFirst = excludeFirst, Count = maxCount, - Field = id is null ? new NullField() : field! + Field = id is null ? NullField.Instance : field! }; - return sql.ReadUserIndexForwardsQuery(db, args); - } - - protected override IEnumerable GetInflightBackwards(string? id, long startPosition, int maxCount, bool excludeFirst) { - return TryGetField(id, out var field) - ? inFlightRecords.GetInFlightRecordsBackwards(startPosition, maxCount, excludeFirst, Filter) - : []; + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + processor.Sql.ReadUserIndexForwardsQuery(connection, args, records); + } + } - bool Filter(UserIndexInFlightRecord r) => id is null || EqualityComparer.Default.Equals(r.Field, field!); + return records; } - protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, string? id, long startPosition, int maxCount, bool excludeFirst) { + protected override List GetDbRecordsBackwards(DuckDBConnectionPool db, + string? id, + long startPosition, + int maxCount, + bool excludeFirst) { if (!TryGetField(id, out var field)) return []; @@ -61,15 +61,25 @@ protected override List GetDbRecordsBackwards(DuckDBConnection StartPosition = startPosition, Count = maxCount, ExcludeFirst = excludeFirst, - Field = id is null ? new NullField() : field! + Field = id is null ? NullField.Instance : field! }; - return sql.ReadUserIndexBackwardsQuery(db, args); + var records = new List(maxCount); + using (db.Rent(out var connection)) { + using (processor.CaptureSnapshot(connection)) { + processor.Sql.ReadUserIndexBackwardsQuery(connection, args, records); + } + } + + return records; } public override TFPos GetLastIndexedPosition(string _) => throw new InvalidOperationException(); // never called public override bool CanReadIndex(string _) => throw new InvalidOperationException(); // never called + internal override BufferedView.Snapshot CaptureSnapshot(DuckDBAdvancedConnection connection) + => processor.CaptureSnapshot(connection); + private static bool TryGetField(string? id, out TField? field) { field = default; @@ -77,7 +87,7 @@ private static bool TryGetField(string? id, out TField? field) { return true; try { - field = (TField) TField.ParseFrom(id); + field = TField.ParseFrom(id); return true; } catch { // invalid field diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs index 2a112f5eaac..96e95e4791d 100644 --- a/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexSql.cs @@ -2,18 +2,52 @@ // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). using System.Text; +using System.Text.Json; using System.Text.RegularExpressions; +using DotNext.Buffers; using Kurrent.Quack; -using Kurrent.Quack.ConnectionPool; using KurrentDB.SecondaryIndexing.Storage; namespace KurrentDB.SecondaryIndexing.Indexes.User; -internal static partial class UserIndexSql { +internal abstract partial class UserIndexSql(string indexName, string fieldName) { // we validate the table/column names for safety reasons, although DuckDB allows a large set of characters when using quoted identifiers private static readonly Regex IdentifierRegex = ValidationRegex(); - public static string GetTableNameFor(string indexName) { + public string TableName { get; } = GetTableNameFor(indexName); + public string FieldColumnName { get; } = GetColumnNameFor(fieldName); + public string ViewName { get; } = GetViewNameFor(indexName); + + public GetCheckpointResult? GetCheckpoint(DuckDBAdvancedConnection connection, in GetCheckpointQueryArgs args) + => connection.QueryFirstOrDefault(in args).ValueOrDefault; + + public static void SetCheckpoint(DuckDBAdvancedConnection connection, in SetCheckpointQueryArgs args) + => connection.ExecuteNonQuery(in args); + + public GetLastIndexedRecordResult? GetLastIndexedRecord(DuckDBAdvancedConnection connection) { + var query = new GetLastIndexedRecordQuery(TableName); + return connection.ExecuteQuery(ref query).FirstOrDefault().ValueOrDefault; + } + + public void ReadUserIndexForwardsQuery(DuckDBAdvancedConnection connection, + in ReadUserIndexQueryArgs args, + ICollection records) { + var query = new ReadUserIndexForwardsQuery(ViewName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName)); + connection + .ExecuteQuery(ref query, in args) + .CopyTo(records); + } + + public void ReadUserIndexBackwardsQuery(DuckDBAdvancedConnection connection, + in ReadUserIndexQueryArgs args, + ICollection records) { + var query = new ReadUserIndexBackwardsQuery(ViewName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName)); + connection + .ExecuteQuery(ref query, in args) + .CopyTo(records); + } + + private static string GetTableNameFor(string indexName) { var tableName = $"idx_user__{indexName}"; return IdentifierRegex.IsMatch(tableName) @@ -21,7 +55,16 @@ public static string GetTableNameFor(string indexName) { : throw new($"Invalid table name: {tableName}"); } - public static string GetColumnNameFor(string fieldName) { + internal static string GetViewNameFor(string indexName) { + // Keep aligned with TransformViewName + var viewName = $"idx_user__{indexName}_view"; + + return IdentifierRegex.IsMatch(viewName) + ? viewName + : throw new($"Invalid view name: {viewName}"); + } + + private static string GetColumnNameFor(string fieldName) { if (fieldName is "") return ""; @@ -32,10 +75,6 @@ public static string GetColumnNameFor(string fieldName) { : throw new($"Invalid column name: {columnName}"); } - public static string GenerateInFlightTableNameFor(string indexName) { - return $"inflight_idx_user__{indexName}_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; - } - public static void DeleteUserIndex(DuckDBAdvancedConnection connection, string indexName) { connection.ExecuteNonQuery(new(indexName)); @@ -48,41 +87,13 @@ public static void DeleteUserIndex(DuckDBAdvancedConnection connection, string i private static partial Regex ValidationRegex(); } -internal class UserIndexSql(string indexName, string fieldName) where TField : IField { - public string TableName { get; } = UserIndexSql.GetTableNameFor(indexName); - public string FieldColumnName { get; } = UserIndexSql.GetColumnNameFor(fieldName); - - public ReadOnlyMemory TableNameUtf8 { get; } = Encoding.UTF8.GetBytes(UserIndexSql.GetTableNameFor(indexName)); +internal sealed class UserIndexSql(string indexName, string fieldName) : UserIndexSql(indexName, fieldName) + where TField : IField { public void CreateUserIndex(DuckDBAdvancedConnection connection) { var query = new CreateUserIndexNonQuery(TableName, TField.GetCreateStatement(FieldColumnName)); connection.ExecuteNonQuery(ref query); } - - public List ReadUserIndexForwardsQuery(DuckDBConnectionPool db, ReadUserIndexQueryArgs args) { - var query = new ReadUserIndexForwardsQuery(TableName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName)); - using (db.Rent(out var connection)) - return connection.ExecuteQuery(ref query, args).ToList(); - } - - public List ReadUserIndexBackwardsQuery(DuckDBConnectionPool db, ReadUserIndexQueryArgs args) { - var query = new ReadUserIndexBackwardsQuery(TableName, args.ExcludeFirst, args.Field.GetQueryStatement(FieldColumnName)); - using (db.Rent(out var connection)) - return connection.ExecuteQuery(ref query, args).ToList(); - } - - public GetCheckpointResult? GetCheckpoint(DuckDBAdvancedConnection connection, GetCheckpointQueryArgs args) { - return connection.QueryFirstOrDefault(args).ValueOrDefault; - } - - public static void SetCheckpoint(DuckDBAdvancedConnection connection, SetCheckpointQueryArgs args) { - connection.ExecuteNonQuery(args); - } - - public GetLastIndexedRecordResult? GetLastIndexedRecord(DuckDBAdvancedConnection connection) { - var query = new GetLastIndexedRecordQuery(TableName); - return connection.ExecuteQuery(ref query).FirstOrDefault().ValueOrDefault; - } } file readonly record struct CreateUserIndexNonQuery(string TableName, string CreateFieldStatement) : IDynamicParameterlessStatement { @@ -115,7 +126,7 @@ public void FormatCommandTemplate(Span args) => args[0] = TableName; } -internal record struct ReadUserIndexQueryArgs(long StartPosition, long EndPosition, bool ExcludeFirst, int Count, IField Field); +internal record struct ReadUserIndexQueryArgs(long StartPosition, bool ExcludeFirst, int Count, IField Field); file readonly record struct ReadUserIndexForwardsQuery(string TableName, bool ExcludeFirst, string FieldQuery) : IDynamicQuery { @@ -123,8 +134,8 @@ file readonly record struct ReadUserIndexForwardsQuery(string TableName, bool Ex """ select log_position, commit_position, event_number from "{0}" - where log_position >{1} ? and log_position < ? {2} - order by rowid limit ? + where log_position >{1} ? {2} + order by coalesce(commit_position, log_position) limit ? """); public void FormatCommandTemplate(Span args) { @@ -136,7 +147,6 @@ public void FormatCommandTemplate(Span args) { public static StatementBindingResult Bind(in ReadUserIndexQueryArgs args, PreparedStatement statement) { var index = 1; statement.Bind(index++, args.StartPosition); - statement.Bind(index++, args.EndPosition); args.Field.BindTo(statement, ref index); statement.Bind(index, args.Count); @@ -156,7 +166,7 @@ file readonly record struct ReadUserIndexBackwardsQuery(string TableName, bool E select log_position, commit_position, event_number from "{0}" where log_position <{1} ? {2} - order by rowid desc + order by coalesce(commit_position, log_position) desc, log_position desc limit ? """); diff --git a/src/KurrentDB.SecondaryIndexing/Indexes/User/ViewName.cs b/src/KurrentDB.SecondaryIndexing/Indexes/User/ViewName.cs new file mode 100644 index 00000000000..cbd596acaf2 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Indexes/User/ViewName.cs @@ -0,0 +1,45 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; +using System.Text; + +namespace KurrentDB.SecondaryIndexing.Indexes.User; + +[StructLayout(LayoutKind.Auto)] +internal readonly struct ViewName : IEquatable { + private readonly byte[] _encodedValue; + private readonly int _hash; // cached for better performance + + public ViewName(ReadOnlySpan value) { + var encoding = Encoding; + var length = encoding.GetByteCount(value); + encoding.GetBytes(value, _encodedValue = new byte[length]); + _hash = GetHashCode(_encodedValue); + } + + private static Encoding Encoding => Encoding.UTF8; + + public ViewName(ReadOnlySpan encodedValue) { + _encodedValue = encodedValue.ToArray(); + _hash = GetHashCode(_encodedValue); + } + + public static int GetHashCode(ReadOnlySpan value) { + var hash = new HashCode(); + hash.AddBytes(value); + return hash.ToHashCode(); + } + + public ReadOnlySpan Value => _encodedValue; + + public bool Equals(ViewName other) => _encodedValue.SequenceEqual(other._encodedValue); + + public override bool Equals([NotNullWhen(true)] object? other) + => other is ViewName name && Equals(name); + + public override int GetHashCode() => _hash; + + public override string ToString() => Encoding.GetString(_encodedValue); +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/IPreparedQueryBinder.cs b/src/KurrentDB.SecondaryIndexing/Query/IPreparedQueryBinder.cs new file mode 100644 index 00000000000..844d23da1b2 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/IPreparedQueryBinder.cs @@ -0,0 +1,8 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +namespace KurrentDB.SecondaryIndexing.Query; + +public interface IPreparedQueryBinder { + void Bind(int index, ReadOnlySpan value, ParameterType type); +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/IQueryEngine.cs b/src/KurrentDB.SecondaryIndexing/Query/IQueryEngine.cs new file mode 100644 index 00000000000..2f7228f1f91 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/IQueryEngine.cs @@ -0,0 +1,30 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using DotNext.Buffers; + +namespace KurrentDB.SecondaryIndexing.Query; + +public interface IQueryEngine { + /// + /// Prepares the query for execution. + /// + /// The query to prepare. + /// to sign the result; otherwise, . + /// The prepared query. + /// The input query has incorrect syntax. + MemoryOwner PrepareQuery(ReadOnlySpan sqlQuery, bool digitallySign); + + /// + /// Executes the prepared query. + /// + /// The prepared query returned by method. + /// The query result consumer. + /// To check the integrity of the prepared query. + /// The token that can be used to cancel the operation. + /// The type of the consumer. + /// The task representing asynchronous state of the operation. + /// The prepared query is invalid. + ValueTask ExecuteAsync(ReadOnlyMemory preparedQuery, TConsumer consumer, bool checkIntegrity, CancellationToken token = default) + where TConsumer : IQueryResultConsumer; +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/IQueryResultConsumer.cs b/src/KurrentDB.SecondaryIndexing/Query/IQueryResultConsumer.cs new file mode 100644 index 00000000000..de82f85f7fc --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/IQueryResultConsumer.cs @@ -0,0 +1,15 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +namespace KurrentDB.SecondaryIndexing.Query; + +/// +/// Represents query result consumer. +/// +public interface IQueryResultConsumer { + ValueTask ConsumeAsync(IQueryResultReader resultReader, CancellationToken token); + + void Bind(scoped TBinder binder) where TBinder : IPreparedQueryBinder, allows ref struct; + + bool UseStreaming => true; +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/IQueryResultReader.cs b/src/KurrentDB.SecondaryIndexing/Query/IQueryResultReader.cs new file mode 100644 index 00000000000..752c77cd13e --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/IQueryResultReader.cs @@ -0,0 +1,22 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using Kurrent.Quack; + +namespace KurrentDB.SecondaryIndexing.Query; + +public interface IQueryResultReader { + /// + /// Advances to the next chunk. + /// + /// if the chunk is available for consumption; if no more chunks available. + bool TryRead(); + + /// + /// Gets the current chunk. + /// + /// + /// The chunk becomes invalid after call. + /// + ref readonly DataChunk Chunk { get; } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/ParameterType.cs b/src/KurrentDB.SecondaryIndexing/Query/ParameterType.cs new file mode 100644 index 00000000000..f79925e444c --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/ParameterType.cs @@ -0,0 +1,17 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +namespace KurrentDB.SecondaryIndexing.Query; + +public enum ParameterType { + Null = 0, + Utf8String, + Blob, + Boolean, + Int32, + UInt32, + Int64, + UInt64, + Int128, + UInt128, +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/PreparedQueryIntegrityException.cs b/src/KurrentDB.SecondaryIndexing/Query/PreparedQueryIntegrityException.cs new file mode 100644 index 00000000000..7acfec85f10 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/PreparedQueryIntegrityException.cs @@ -0,0 +1,13 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +namespace KurrentDB.SecondaryIndexing.Query; + +/// +/// Indicates that the prepared query is modified. +/// +public class PreparedQueryIntegrityException : Exception { + internal PreparedQueryIntegrityException() + : base("Prepared query is invalid") { + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Binder.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Binder.cs new file mode 100644 index 00000000000..033a0a73b5e --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Binder.cs @@ -0,0 +1,51 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Buffers.Binary; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Kurrent.Quack; + +namespace KurrentDB.SecondaryIndexing.Query; + +partial class QueryEngine { + [StructLayout(LayoutKind.Auto)] + private ref struct QueryBinder(ref readonly PreparedStatement statement) : IPreparedQueryBinder { + private readonly ref readonly PreparedStatement _statement = ref statement; + + public void Bind(int index, ReadOnlySpan value, ParameterType type) { + switch (type) { + case ParameterType.Blob: + _statement.Bind(index, value, BlobType.Raw); + break; + case ParameterType.Boolean: + _statement.Bind(index, Unsafe.BitCast(value[0])); + break; + case ParameterType.Utf8String: + _statement.Bind(index, value, BlobType.Utf8); + break; + case ParameterType.Int32: + _statement.Bind(index, BinaryPrimitives.ReadInt32LittleEndian(value)); + break; + case ParameterType.UInt32: + _statement.Bind(index, BinaryPrimitives.ReadUInt32LittleEndian(value)); + break; + case ParameterType.Int64: + _statement.Bind(index, BinaryPrimitives.ReadInt64LittleEndian(value)); + break; + case ParameterType.UInt64: + _statement.Bind(index, BinaryPrimitives.ReadUInt64LittleEndian(value)); + break; + case ParameterType.Int128: + _statement.Bind(index, BinaryPrimitives.ReadInt128LittleEndian(value)); + break; + case ParameterType.UInt128: + _statement.Bind(index, BinaryPrimitives.ReadUInt128LittleEndian(value)); + break; + default: + _statement.Bind(index, DBNull.Value); + break; + } + } + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.PreparedQuery.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.PreparedQuery.cs new file mode 100644 index 00000000000..598c5cd334b --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.PreparedQuery.cs @@ -0,0 +1,157 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Buffers; +using System.Runtime.InteropServices; +using System.Security.Cryptography; +using System.Text; +using DotNext.Buffers; +using DotNext.IO; +using DotNext.Text; + +namespace KurrentDB.SecondaryIndexing.Query; + +partial class QueryEngine { + private static readonly MemoryAllocator ArrayPoolAllocator = ArrayPool.Shared.ToAllocator(); + + // Binary format: + // 1 byte - flags + // 4 bytes (Q) - rewritten query length + // Q bytes - rewritten query UTF-8 encoded + // 4 bytes (U) - the number of user-defined views: + // 4 bytes (L) - view name length + // L bytes - view name UTF-8 encoded + // N bytes (optional) - digital signature + [StructLayout(LayoutKind.Auto)] + private ref struct PreparedQueryBuilder() { + private readonly List _userIndices = new(); + + public bool HasDefaultIndex; + + public readonly void AddUserIndexViewName(string viewName) => _userIndices.Add(viewName); + + public MemoryOwner Build(ReadOnlySpan queryUtf8, ReadOnlySpan signatureKey) { + var writer = new BufferWriterSlim(1024, ArrayPoolAllocator); + try { + // Header + writer.Add(GetFlags(HasDefaultIndex)); + + // rewritten query + writer.WriteLittleEndian(queryUtf8.Length); + writer.Write(queryUtf8); + + // Views + writer.WriteLittleEndian(_userIndices.Count); + + var encodingContext = new EncodingContext(Encoding.UTF8, reuseEncoder: true); + foreach (var viewName in _userIndices) { + writer.Encode(viewName, in encodingContext, LengthFormat.LittleEndian); + } + + // Digital Signature + if (!signatureKey.IsEmpty) { + Sign(ref writer, signatureKey); + } + + return writer.DetachOrCopyBuffer(); + } finally { + writer.Dispose(); + } + + static byte GetFlags(bool hasDefaultIndex) { + var result = PreparedQuery.EmptyFlags; + + if (hasDefaultIndex) { + result |= PreparedQuery.HasDefaultIndexFlag; + } + + return result; + } + + static void Sign(ref BufferWriterSlim writer, ReadOnlySpan signatureKey) { + var payload = writer.WrittenSpan; + var hash = writer.GetSpan(HMACSHA256.HashSizeInBytes); + writer.Advance(HMACSHA256.HashData(signatureKey, payload, hash)); + } + } + } + + [StructLayout(LayoutKind.Auto)] + private readonly ref struct PreparedQuery { + public const byte EmptyFlags = 0; + public const byte HasDefaultIndexFlag = 1; + + public readonly bool HasDefaultIndex; + public readonly ReadOnlySpan Query; + private readonly int _viewNameCount; + private readonly ReadOnlySpan _viewNames; + private readonly ReadOnlySpan _signature; + private readonly ReadOnlySpan _payload; + + public PreparedQuery(ReadOnlySpan preparedQuery) { + var reader = new SpanReader(preparedQuery); + var flags = reader.Read(); + HasDefaultIndex = (flags & HasDefaultIndexFlag) is not 0; + + // Query + var length = reader.ReadLittleEndian(); + Query = reader.Read(length); + + // View names + _viewNameCount = reader.ReadLittleEndian(); + _viewNames = reader.RemainingSpan; + + // Signature (last 32 bytes). If prepared query is not signed, it could be just last 32 bytes, which is fine + // because the caller is knows for sure is the query should be signed or not + var offset = preparedQuery.Length - HMACSHA256.HashSizeInBytes; + if (offset >= 0) { + _payload = preparedQuery.Slice(0, offset); + _signature = preparedQuery.Slice(offset); + } else { + _payload = preparedQuery; + _signature = ReadOnlySpan.Empty; + } + } + + public int ViewCount => _viewNameCount; + + public ViewNameEnumerator ViewNames => new(_viewNameCount, _viewNames); + + public bool CheckIntegrity(ReadOnlySpan signatureKey) { + + Span actual = stackalloc byte[HMACSHA256.HashSizeInBytes]; + HMACSHA256.HashData(signatureKey, _payload, actual); + + // compare hashes + return actual.SequenceEqual(_signature); + } + } + + [StructLayout(LayoutKind.Auto)] + private ref struct ViewNameEnumerator { + private SpanReader _reader; + private ReadOnlySpan _current; + private int _entryCount; + + public ViewNameEnumerator(int entryCount, ReadOnlySpan reader) { + _reader = new(reader); + _entryCount = entryCount; + } + + public readonly ReadOnlySpan Current => _current; + + public static bool MoveNext(scoped ref SpanReader reader, scoped ref int remainingCount, out ReadOnlySpan current) { + if (remainingCount <= 0) { + current = default; + return false; + } + + remainingCount--; + var length = reader.ReadLittleEndian(); + current = reader.Read(length); + return true; + } + + public bool MoveNext() => MoveNext(ref _reader, ref _entryCount, out _current); + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.QueryResult.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.QueryResult.cs new file mode 100644 index 00000000000..96056c75880 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.QueryResult.cs @@ -0,0 +1,39 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using DotNext; +using DuckDB.NET.Data; +using Kurrent.Quack; +using QuackQueryResult = Kurrent.Quack.QueryResult; + +namespace KurrentDB.SecondaryIndexing.Query; + +partial class QueryEngine { + private sealed class QueryResultReader : Disposable, IQueryResultReader { + private QuackQueryResult _result; + private DataChunk _chunk; + + public QueryResultReader(in PreparedStatement statement, bool useStreaming) { + _result = statement.ExecuteQuery(useStreaming); + } + + public bool TryRead() { + _chunk.Dispose(); + return _result.TryFetch(out _chunk); + } + + public ref readonly DataChunk Chunk => ref _chunk; + + private void FinalizeEnumeration() { + while (_result.TryFetch(out _chunk)) { + _chunk.Dispose(); + } + } + + protected override void Dispose(bool disposing) { + FinalizeEnumeration(); + _result.Dispose(); + base.Dispose(disposing); + } + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Rewriter.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Rewriter.cs new file mode 100644 index 00000000000..e8ddca1db65 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.Rewriter.cs @@ -0,0 +1,136 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Text.Json; +using System.Text.Json.Nodes; +using DotNext.Buffers; +using Kurrent.Quack; +using Kurrent.Quack.Parser; +using KurrentDB.SecondaryIndexing.Indexes.Default; +using KurrentDB.SecondaryIndexing.Indexes.User; + +namespace KurrentDB.SecondaryIndexing.Query; + +partial class QueryEngine { + private MemoryOwner RewriteQuery(ReadOnlySpan queryUtf8, ref PreparedQueryBuilder builder) { + JsonNode? tree; + + // Obtain AST + using (sharedPool.Rent(out var connection)) { + tree = connection.ParseSyntaxTree(queryUtf8); + } + + // Transform AST + switch (tree?["error"]?.GetValueKind()) { + case JsonValueKind.False: + RewriteNode(tree, ref builder); + break; + case JsonValueKind.True: + throw new QuerySyntaxException(tree["error_message"]?.ToString() ?? string.Empty) { + Type = tree["error_type"]?.ToString() ?? string.Empty, + SubType = tree["error_subtype"]?.ToString() ?? string.Empty, + Position = tree["position"]?.ToString() ?? string.Empty, + }; + default: + throw QuerySyntaxException.InvalidAst(); + } + + // Convert AST back to the query + using (sharedPool.Rent(out var connection)) { + return connection.FromSyntaxTree(tree); + } + } + + private void RewriteNode(JsonNode ast, ref PreparedQueryBuilder builder) { + switch (ast) { + case JsonObject obj: + RewriteNode(obj, ref builder); + break; + case JsonArray array: + RewriteNode(array, ref builder); + break; + } + } + + private void RewriteNode(JsonObject node, ref PreparedQueryBuilder builder) { + foreach (var (propertyName, propertyValue) in node) { + if (propertyValue is null) { + // nothing to do + } else if (propertyName is "from_table") { + RewriteFromClause(propertyValue, ref builder); + } else { + RewriteNode(propertyValue, ref builder); + } + } + } + + private void RewriteNode(JsonArray array, ref PreparedQueryBuilder builder) { + foreach (var element in array) { + if (element is not null) + RewriteNode(element, ref builder); + } + } + + private void RewriteFromClause(JsonNode fromClause, ref PreparedQueryBuilder builder) { + // https://duckdb.org/docs/stable/sql/query_syntax/from + // The following cases are possible: + // TABLE_FUNCTION - not allowed + // BASE_TABLE - allowed, the only allowed schemas are 'kdb' and 'usr' + // JOIN - contains 'left' and 'right' sub-objects, apply rewrite recursively + // SUBQUERY - allowed + + switch (fromClause["type"]?.ToString()) { + case "BASE_TABLE": + RewriteTableReference(fromClause, ref builder); + break; + case "JOIN": + var left = fromClause["left"] ?? throw QuerySyntaxException.InvalidAst(); + var right = fromClause["right"] ?? throw QuerySyntaxException.InvalidAst(); + RewriteNode(left, ref builder); + RewriteNode(right, ref builder); + break; + case "SUBQUERY": + var subQuery = fromClause["subquery"] ?? throw QuerySyntaxException.InvalidAst(); + RewriteNode(subQuery, ref builder); + break; + case var sourceType: + throw new UnsupportedQueryDataSourceTypeException(sourceType); + } + } + + private void RewriteTableReference(JsonNode tableReference, ref PreparedQueryBuilder builder) { + const string schemaNameProperty = "schema_name"; + const string tableNameProperty = "table_name"; + + // validate schema name + switch (tableReference[schemaNameProperty]?.ToString()) { + case "kdb": + // rewrite system table name + var tableName = tableReference[tableNameProperty]?.ToString() ?? string.Empty; + tableName = RewriteSystemTableName(tableName, ref builder); + tableReference[tableNameProperty] = tableName; + break; + case "usr": + // rewrite user index + tableName = tableReference[tableNameProperty]?.ToString() ?? string.Empty; + tableName = UserIndexSql.GetViewNameFor(tableName); + builder.AddUserIndexViewName(tableName); + tableReference[tableNameProperty] = tableName; + break; + case var schemaName: + throw new UnsupportedSchemaException(schemaName); + } + + tableReference[schemaNameProperty] = string.Empty; + } + + private string RewriteSystemTableName(string tableName, ref PreparedQueryBuilder builder) { + switch (tableName) { + case "records": + builder.HasDefaultIndex = true; + return DefaultSql.DefaultIndexViewName; + default: + throw new UnsupportedSystemTableException(tableName); + } + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.cs new file mode 100644 index 00000000000..689fe2a1890 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryEngine.cs @@ -0,0 +1,94 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Runtime.InteropServices; +using System.Security.Cryptography; +using DotNext; +using DotNext.Buffers; +using DuckDB.NET.Data; +using Kurrent.Quack; +using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack.Threading; +using KurrentDB.SecondaryIndexing.Indexes.Default; +using KurrentDB.SecondaryIndexing.Indexes.User; + +namespace KurrentDB.SecondaryIndexing.Query; + +/// +/// Represents a single entry point to execute SQL queries over KurrentDB indices. +/// +/// +/// +/// +internal sealed partial class QueryEngine(DefaultIndexProcessor defaultIndex, + UserIndexEngine userIndex, + DuckDBConnectionPool sharedPool) : IQueryEngine { + // 32 bytes key is aligned with HMAC SHA-3 256 hash length + private readonly ReadOnlyMemory _signatureKey = RandomNumberGenerator.GetBytes(32); + + public MemoryOwner PrepareQuery(ReadOnlySpan queryUtf8, bool digitallySign) { + var builder = new PreparedQueryBuilder(); + using var rewrittenQuery = RewriteQuery(queryUtf8, ref builder); + + return builder.Build(rewrittenQuery.Span, digitallySign ? _signatureKey.Span : ReadOnlySpan.Empty); + } + + public async ValueTask ExecuteAsync(ReadOnlyMemory preparedQuery, + TConsumer consumer, + bool checkIntegrity, + CancellationToken token) + where TConsumer : IQueryResultConsumer { + var parsedQuery = new PreparedQuery(preparedQuery.Span); + if (checkIntegrity) { + CheckIntegrity(in parsedQuery); + } + + var snapshots = new PoolingBufferWriter { Capacity = parsedQuery.ViewCount + 1 }; // + default index + var rental = sharedPool.Rent(out var connection); + try { + CaptureSnapshots(in parsedQuery, connection, snapshots, token); + using var statement = new PreparedStatement(connection, parsedQuery.Query); + consumer.Bind(new QueryBinder(in statement)); + + using var reader = new QueryResultReader(in statement, consumer.UseStreaming); + await consumer.ConsumeAsync(reader, token); + } finally { + Disposable.Dispose(snapshots.WrittenMemory.Span); // release all captured snapshot + ((IDisposable)rental).Dispose(); + snapshots.Dispose(); + } + } + + private void CheckIntegrity(ref readonly PreparedQuery parsedQuery) { + if (!parsedQuery.CheckIntegrity(_signatureKey.Span)) + throw new PreparedQueryIntegrityException(); + } + + private void CaptureSnapshots(ref readonly PreparedQuery preparedQuery, + DuckDBAdvancedConnection connection, + PoolingBufferWriter snapshots, + CancellationToken token) { + if (preparedQuery.HasDefaultIndex) { + // default index detected + snapshots.Add(new() { Snapshot = defaultIndex.CaptureSnapshot(connection) }); + } + + for (var viewNames = preparedQuery.ViewNames; viewNames.MoveNext(); token.ThrowIfCancellationRequested()) { + if (userIndex.TryCaptureSnapshot(viewNames.Current, connection, out var readLock, out var snapshot)) { + // user-defined index detected + snapshots.Add(new() { Snapshot = snapshot, ReadLock = readLock }); + } + } + } + + [StructLayout(LayoutKind.Auto)] + private struct SnapshotInfo : IDisposable { + public BufferedView.Snapshot Snapshot; + public UserIndexEngineSubscription.ReadLock ReadLock; + + public void Dispose() { + Snapshot.Dispose(); + ReadLock.Dispose(); + } + } +} diff --git a/src/KurrentDB.SecondaryIndexing/Query/QueryPreparationException.cs b/src/KurrentDB.SecondaryIndexing/Query/QueryPreparationException.cs new file mode 100644 index 00000000000..95687dbe4a5 --- /dev/null +++ b/src/KurrentDB.SecondaryIndexing/Query/QueryPreparationException.cs @@ -0,0 +1,67 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +namespace KurrentDB.SecondaryIndexing.Query; + +/// +/// Represents a root class for all query preparation exceptions. +/// +public class QueryPreparationException : Exception { + private protected QueryPreparationException(string message) : base(message) { + + } +} + +/// +/// Indicates incorrect query syntax. +/// +public sealed class QuerySyntaxException : QueryPreparationException { + internal QuerySyntaxException(string message) + : base(message) { + } + + /// + /// Gets the type of the error. + /// + public required string Type { get; init; } + + /// + /// Gets the type of the sub-error. + /// + public required string SubType { get; init; } + + /// + /// Get the position within the query. + /// + public required string Position { get; init; } + + internal static QuerySyntaxException InvalidAst() => new("DuckDB doesn't produce a valid AST JSON tree") { + Type = string.Empty, + SubType = string.Empty, + Position = "0", + }; +} + +/// +/// Indicates that the specified data source in FROM clause is not supported, e.g. table function. +/// +public sealed class UnsupportedQueryDataSourceTypeException : QueryPreparationException { + internal UnsupportedQueryDataSourceTypeException(string? sourceType) + : base($"Unsupported query data source '{sourceType}'") => SourceType = sourceType; + + public string? SourceType { get; } +} + +public sealed class UnsupportedSchemaException : QueryPreparationException { + internal UnsupportedSchemaException(string? schemaName) + : base($"Schema '{schemaName}' is not supported") => SchemaName = schemaName; + + public string? SchemaName { get; } +} + +public sealed class UnsupportedSystemTableException : QueryPreparationException { + internal UnsupportedSystemTableException(string tableName) + : base($"System table '{tableName}' is not supported") => TableName = tableName; + + public string TableName { get; } +} diff --git a/src/KurrentDB.SecondaryIndexing/SecondaryIndexingPlugin.cs b/src/KurrentDB.SecondaryIndexing/SecondaryIndexingPlugin.cs index c03ece7116e..5bcdd1acfa0 100644 --- a/src/KurrentDB.SecondaryIndexing/SecondaryIndexingPlugin.cs +++ b/src/KurrentDB.SecondaryIndexing/SecondaryIndexingPlugin.cs @@ -20,6 +20,7 @@ using KurrentDB.SecondaryIndexing.Indexes.EventType; using KurrentDB.SecondaryIndexing.Indexes.User; using KurrentDB.SecondaryIndexing.Indexes.User.Management; +using KurrentDB.SecondaryIndexing.Query; using KurrentDB.SecondaryIndexing.Stats; using KurrentDB.SecondaryIndexing.Storage; using KurrentDB.SecondaryIndexing.Telemetry; @@ -52,15 +53,14 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(static sp => sp.GetRequiredService()); services.AddSingleton(); services.AddDuckDBSetup(); - services.AddDuckDBSetup(); services.AddHostedService(); services.AddHostedService(sp => sp.GetRequiredService()); - services.AddSingleton(); - var meter = new Meter(SecondaryIndexingConstants.MeterName, "1.0.0"); services.AddKeyedSingleton(SecondaryIndexingConstants.InjectionKey, meter); diff --git a/src/KurrentDB.SecondaryIndexing/Storage/DuckDbExtensions.cs b/src/KurrentDB.SecondaryIndexing/Storage/DuckDbExtensions.cs index 21ae03dae9a..f92cbf00003 100644 --- a/src/KurrentDB.SecondaryIndexing/Storage/DuckDbExtensions.cs +++ b/src/KurrentDB.SecondaryIndexing/Storage/DuckDbExtensions.cs @@ -19,4 +19,12 @@ public List QueryToList() where TQuery : IQuery => db.QueryAsCollection>(); } + + public static void CopyTo(this QueryResult result, ICollection output) + where TArgs : struct + where TQuery : IQuery, IBinder, IDataRowParser, allows ref struct { + foreach (ref readonly var row in result) { + output.Add(row); + } + } } diff --git a/src/KurrentDB.SecondaryIndexing/Storage/InFlightSetup.cs b/src/KurrentDB.SecondaryIndexing/Storage/InFlightSetup.cs deleted file mode 100644 index fd6ea994c05..00000000000 --- a/src/KurrentDB.SecondaryIndexing/Storage/InFlightSetup.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. -// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). - -using System.Diagnostics.CodeAnalysis; -using DuckDB.NET.Data; -using DuckDB.NET.Data.DataChunk.Writer; -using KurrentDB.DuckDB; -using KurrentDB.SecondaryIndexing.Indexes.Default; - -namespace KurrentDB.SecondaryIndexing.Storage; - -[UsedImplicitly] -internal class InFlightSetup(DefaultIndexInFlightRecords inFlightRecords) : IDuckDBSetup { - public void Execute(DuckDBConnection connection) { - connection.RegisterTableFunction("inflight", ReadBackwardsResultCallback, ReadInFlightMapperCallback); - } - - public bool OneTimeOnly => false; - - private static readonly IReadOnlyList ColumnInfos = [ - new("log_position", typeof(long)), - new("event_type", typeof(string)), - new("category", typeof(string)), - new("stream", typeof(string)), - new("event_number", typeof(long)), - new("created", typeof(long)), - ]; - - private TableFunction ReadBackwardsResultCallback() { - var records = inFlightRecords.GetInFlightRecords(); - - return new(ColumnInfos, records); - } - - private static void ReadInFlightMapperCallback(object? item, IDuckDBDataWriter[] writers, ulong rowIndex) { - var record = (InFlightRecord)item!; - - writers[0].WriteValue(record.LogPosition, rowIndex); - writers[1].WriteValue(record.EventType, rowIndex); - writers[2].WriteValue(record.Category, rowIndex); - writers[3].WriteValue(record.StreamName, rowIndex); - writers[4].WriteValue(record.EventNumber, rowIndex); - writers[5].WriteValue(record.Created, rowIndex); - } -} diff --git a/src/KurrentDB.SecondaryIndexing/Storage/IndexingDbSchema.cs b/src/KurrentDB.SecondaryIndexing/Storage/IndexingDbSchema.cs index 726088e742a..f0501e3af28 100644 --- a/src/KurrentDB.SecondaryIndexing/Storage/IndexingDbSchema.cs +++ b/src/KurrentDB.SecondaryIndexing/Storage/IndexingDbSchema.cs @@ -2,16 +2,31 @@ // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). using System.Reflection; +using System.Security.Claims; using DuckDB.NET.Data; -using Kurrent.Quack.ConnectionPool; +using Kurrent.Quack; +using Kurrent.Quack.Threading; +using KurrentDB.Core.Bus; +using KurrentDB.Core.Services.Transport.Enumerators; using KurrentDB.DuckDB; namespace KurrentDB.SecondaryIndexing.Storage; -public class IndexingDbSchema : DuckDBOneTimeSetup { +public class IndexingDbSchema(Func> eventsProvider) : DuckDBOneTimeSetup { private static readonly Assembly Assembly = typeof(IndexingDbSchema).Assembly; - protected override void ExecuteCore(DuckDBConnection connection) { + public IndexingDbSchema(IPublisher publisher) + : this(publisher.GetEnumerator) { + } + + protected override void ExecuteCore(DuckDBAdvancedConnection connection) { + BufferedView.EnableSupport(connection); + new Indexes.User.ExpandRecordFunction(eventsProvider).Register(connection); + new Indexes.Default.ExpandRecordFunction(eventsProvider).Register(connection); + CreateSchema(connection); + } + + private void CreateSchema(DuckDBConnection connection) { var names = Assembly.GetManifestResourceNames().Where(x => x.EndsWith(".sql")).OrderBy(x => x); using var transaction = connection.BeginTransaction(); var cmd = connection.CreateCommand(); @@ -35,9 +50,12 @@ protected override void ExecuteCore(DuckDBConnection connection) { transaction.Commit(); } +} - public void CreateSchema(DuckDBConnectionPool pool) { - using var connection = pool.Open(); - Execute(connection); - } +file static class EventReader { + public static IEnumerator GetEnumerator(this IPublisher publisher, long[] logPositions, ClaimsPrincipal user) + => new Enumerator.ReadLogEventsSync( + bus: publisher, + logPositions, + user); } diff --git a/src/KurrentDB.SecondaryIndexing/Subscriptions/UserIndexSubscription.cs b/src/KurrentDB.SecondaryIndexing/Subscriptions/UserIndexSubscription.cs index dc55bb91d2e..ebbffaa1dcb 100644 --- a/src/KurrentDB.SecondaryIndexing/Subscriptions/UserIndexSubscription.cs +++ b/src/KurrentDB.SecondaryIndexing/Subscriptions/UserIndexSubscription.cs @@ -19,7 +19,7 @@ internal abstract class UserIndexSubscription { public abstract ValueTask Start(); public abstract ValueTask Stop(); public abstract TFPos GetLastIndexedPosition(); - public abstract void GetUserIndexTableDetails(out string tableName, out string inFlightTableName, out string? fieldName); + public abstract void GetUserIndexTableDetails(out string tableName, out string? fieldName); } internal sealed class UserIndexSubscription( @@ -27,7 +27,7 @@ internal sealed class UserIndexSubscription( UserIndexProcessor indexProcessor, SecondaryIndexingPluginOptions options, ILogger log, - CancellationToken token) : UserIndexSubscription, IAsyncDisposable where TField : IField { + CancellationToken token) : UserIndexSubscription, IAsyncDisposable where TField : IField { private readonly int _commitBatchSize = options.CommitBatchSize; private CancellationTokenSource? _cts = CancellationTokenSource.CreateLinkedTokenSource(token); private Enumerator.AllSubscription? _subscription; @@ -158,8 +158,8 @@ public override async ValueTask Stop() { public override TFPos GetLastIndexedPosition() => indexProcessor.GetLastPosition(); - public override void GetUserIndexTableDetails(out string tableName, out string inFlightTableName, out string? fieldName) => - indexProcessor.GetUserIndexTableDetails(out tableName, out inFlightTableName, out fieldName); + public override void GetUserIndexTableDetails(out string tableName, out string? fieldName) => + indexProcessor.GetUserIndexTableDetails(out tableName, out fieldName); } static partial class UserIndexSubscriptionLogMessages { diff --git a/src/KurrentDB/Components/Query/Query.razor b/src/KurrentDB/Components/Query/Query.razor index a428dfe0412..c4c11dc78b3 100644 --- a/src/KurrentDB/Components/Query/Query.razor +++ b/src/KurrentDB/Components/Query/Query.razor @@ -6,21 +6,19 @@ @attribute [ExcludeFromInteractiveRouting] @rendermode InteractiveServer @using System.Diagnostics +@using System.Linq @using System.Text.Json -@using BlazorMonaco +@using System.Threading @using BlazorMonaco.Editor -@using Dapper -@using Kurrent.Quack.ConnectionPool @using KurrentDB.Common.Configuration @using KurrentDB.Components.Plugins @using KurrentDB.Components.Tools -@using KurrentDB.SecondaryIndexing.Indexes.User +@using KurrentDB.SecondaryIndexing.Query @using KurrentDB.Services @using KurrentDB.UI.Services @using Microsoft.AspNetCore.Authorization -@using Serilog -@inject DuckDBConnectionPool Db -@inject UserIndexEngine UserIndexEngine +@implements IDisposable +@inject IQueryEngine QueryEngine @inject Preferences Preferences @inject IDialogService DialogService @inject PluginsService Plugins @@ -62,29 +60,34 @@ -@if (_items != null) { - +@if (_items is { RootElement: { ValueKind: JsonValueKind.Array } array }) +{ + var columns = array.GetArrayLength() > 0 + ? array[0].EnumerateObject().Select(static p => p.Name) + : []; + + - - @if (!_items.Any()) { - } else { - foreach (var item in _items.First().Keys) { - - } + + @foreach (var col in columns) { + var colName = col; + } - @foreach (var item in context.Item.Keys) { - if (!IsJsonString(context.Item[item])) continue; - - - @{ var json = context.Item[item]?.ToString(); } - - + @foreach (var prop in context.Item.EnumerateObject()) { + if (prop.Value.ValueKind is not (JsonValueKind.Object or JsonValueKind.Array)) continue; + var propName = prop.Name; + var propJson = prop.Value.GetRawText(); + + + Copy JSON - + Copy indented JSON @@ -92,14 +95,14 @@ - + } @code { - static readonly ILogger Log = Serilog.Log.ForContext(); - IEnumerable> _items; + readonly CancellationTokenSource _cts = new(); + JsonDocument _items; StandaloneCodeEditor _editor = null!; string _message = ""; string _error = ""; @@ -119,15 +122,17 @@ } async Task OnEditorInit() { - await _editor.AddCommand((int)KeyMod.CtrlCmd | (int)KeyCode.Enter, _ => InvokeAsync(Execute)); - await _editor.Focus(); + // await _editor.AddCommand((int)KeyMod.CtrlCmd | (int)KeyCode.Enter, _ => InvokeAsync(Execute)); + // await _editor.Focus(); } - async Task Execute() { + async Task Execute() + { var sql = await _editor.GetValue(); if (string.IsNullOrWhiteSpace(sql)) return; + _items?.Dispose(); _items = null; _showProgress = true; _message = ""; @@ -137,8 +142,9 @@ var watch = new Stopwatch(); watch.Start(); - try { - _items = Db.ExecuteAdHocUserQuery(UserIndexEngine.TryGetUserIndexTableDetails, sql); + try + { + _items = await QueryEngine.ExecuteAdHocUserQuery(sql, _cts.Token); watch.Stop(); _message = $"Query executed in {watch}"; } @@ -160,11 +166,18 @@ void PreferencesOnThemeChanged(object sender, EventArgs e) => _editor.UpdateOptions(new() { Theme = EditorTheme() }); } - static object Shorten(object value) => value is not string s ? value : s[..Math.Min(s.Length, 50)]; - - static bool HasNoJson(IDictionary arg) => arg.Keys.Select(key => arg[key]).All(item => !IsJsonString(item)); + public void Dispose() + { + _items?.Dispose(); + _cts.Cancel(); + } - static bool IsJsonString(object value) => value is string s && (s.StartsWith('{') && s.EndsWith('}') || s.StartsWith('[') && s.EndsWith(']')); + static string GetPropertyValue(JsonElement element, string name) + { + return element.TryGetProperty(name, out var property) + ? property.ToString() + : string.Empty; + } string EditorTheme() => Preferences.DarkMode ? "vs-dark" : "vs-light"; diff --git a/src/KurrentDB/Components/Query/QueryHelpDialog.razor b/src/KurrentDB/Components/Query/QueryHelpDialog.razor index 075a56ed513..8a845b99b36 100644 --- a/src/KurrentDB/Components/Query/QueryHelpDialog.razor +++ b/src/KurrentDB/Components/Query/QueryHelpDialog.razor @@ -26,20 +26,18 @@ const string HelpText = @" ##### Available sources -Query one or more available sources like streams, categories, or everything. +Query log records from default or user-defined index. **Beware that querying the whole log will trigger a full log scan, which impact performance of the database.** | Source | Description | Example | |----------|------------|-------------| -| `stream:` | Query a stream | `select * from stream:Order-123` | -| `category:` | Query a category | `select * from category:Order where data->>'customerId'='bob'` | -| `index:` | Query a user index | `select * from index:orders-by-country where field_country='Mauritius'` | -| `all_events` | Query all log records | `select * from all_events limit 10` | +| `kdb.records` | Query default index | `select * from kdb.records where stream='Order-123'` | +| `usr.` | Query user-defined index | `select * from usr.MyIndex where field_country='Mauritius'` | **Consider adding `limit` to the query to constrain the scan surface.** -##### Available predicates +##### Available columns You can use top-level predicates as well as JSON predicates on payload and metadata. @@ -47,9 +45,9 @@ Top-level predicates are available on: * `log_position`: commit position (bigint) * `stream`: stream name (varchar) * `category`: stream category (varchar) -* `event_type`: event type (varchar) -* `event_number`: event number in a stream (int) -* `created_at`: record append time (timestamp, for example `1992-09-20 11:30:00.123`) +* `schema_name`: schema name (varchar) +* `stream_revision`: event number in a stream (int) +* `created_at`: record append time (Unix timestamp, in millis) JSON predicates are available on: * `data`: event payload @@ -63,8 +61,8 @@ streams per found version value. select version, count(*) as count from ( select distinct version, stream from ( select data->>'Version' as version, stream - from category:Telemetry - where version is not null + from kdb.records + where version is not null and category='Telemetry' ) ) group by version ``` diff --git a/src/KurrentDB/Components/Query/QueryService.cs b/src/KurrentDB/Components/Query/QueryService.cs index eab9b149b39..d770966073a 100644 --- a/src/KurrentDB/Components/Query/QueryService.cs +++ b/src/KurrentDB/Components/Query/QueryService.cs @@ -1,127 +1,83 @@ // Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. // Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). -using System.Collections.Generic; -using System.Linq; +using System; +using System.Buffers; +using System.IO; +using System.Text; using System.Text.Json; -using System.Text.Json.Serialization; -using System.Text.RegularExpressions; -using Dapper; -using Kurrent.Quack.ConnectionPool; +using System.Threading; +using System.Threading.Tasks; +using DotNext; +using DotNext.Buffers; +using Kurrent.Quack; +using KurrentDB.SecondaryIndexing.Query; namespace KurrentDB.Components.Query; -public static partial class QueryService { - internal delegate bool TryGetUserIndexTableDetails(string indexName, out string tableName, out string inFlightTableName, out string fieldName); - - private static string AmendQuery(DuckDBConnectionPool pool, TryGetUserIndexTableDetails tryGetUserIndexTableDetails, string query) { - var matches = ExtractionRegex().Matches(query); - List ctes = [AllCte]; - foreach (Match match in matches) { - if (!match.Success) - continue; - var tokens = match.Value.Split(':'); - var cteName = ReplaceSpecialCharsWithUnderscore($"{tokens[0]}_{tokens[1]}"); - - string cte; - switch (tokens[0]) { - case "stream": - cte = string.Format(AllCteTemplate, cteName, $"where stream = '{tokens[1]}'"); - break; - case "category": - cte = string.Format(AllCteTemplate, cteName, $"where category = '{tokens[1]}'"); - break; - case "index": - var indexName = tokens[1]; - var exists = tryGetUserIndexTableDetails(indexName, out var tableName, out var tableFunctionName, out var fieldName); - if (!exists) - throw new("Index does not exist or is not started"); - - cte = string.Format( - UserIndexCteTemplate, - cteName, - $"\"{tableName}\"", - $"\"{tableFunctionName}\"", - fieldName is "" ? "" : $", \"{fieldName}\""); - break; - default: - throw new("Invalid token"); +public static class QueryService { + internal static async ValueTask ExecuteAdHocUserQuery(this IQueryEngine engine, string sql, CancellationToken token) { + // Convert query result to JSON + sql = $"SELECT to_json(sub_query) FROM ({sql}) sub_query LIMIT 100"; + + + var preparedQuery = default(MemoryOwner); + var reader = new JsonReader(); + try { + preparedQuery = engine.PrepareQuery(Encoding.UTF8.GetBytes(sql), digitallySign: false); + + await engine.ExecuteAsync(preparedQuery.Memory, reader, checkIntegrity: false, token); + + return reader.ToJson(); + } finally { + preparedQuery.Dispose(); + reader.Dispose(); + } + } + + private sealed class JsonReader : Disposable, IQueryResultConsumer { + private readonly PoolingBufferWriter _writer = new() { Capacity = 4096 }; + + public ValueTask ConsumeAsync(IQueryResultReader resultReader, CancellationToken token) { + var task = ValueTask.CompletedTask; + try { + Consume(resultReader, token); + } catch (OperationCanceledException e) when (e.CancellationToken == token) { + task = ValueTask.FromCanceled(token); + } catch (Exception e) { + task = ValueTask.FromException(e); } - ctes.Add(cte); - query = query.Replace(match.Value, cteName); + return task; } - ValidateQuery(pool, query); - return $"with\r\n{string.Join(",\r\n", ctes)}\r\n{query}"; - } + private void Consume(IQueryResultReader resultReader, CancellationToken token) { + _writer.Add((byte)'['); + while (resultReader.TryRead()) { + foreach (ref readonly var row in resultReader.Chunk[0].BlobRows) { + _writer.Write(row.AsSpan()); + _writer.Add((byte)','); + token.ThrowIfCancellationRequested(); + } + } - private static void ValidateQuery(DuckDBConnectionPool pool, string query) { - // var result = pool.QueryFirstOrDefault(new(query)); - using var _ = pool.Rent(out var connection); - var result = connection.QueryFirstOrDefault("select json_serialize_sql($query::varchar)", new { query }); - if (result == null) { - throw new("Error parsing query"); + // Remove trailing comma + _writer.WrittenCount--; + _writer.Add((byte)']'); } - var conversionResponse = JsonSerializer.Deserialize(result); - if (conversionResponse.Error) { - var error = conversionResponse.ErrorMessage.StartsWith("Only SELECT") - ? "Only SELECT statements are allowed" - : $"Error parsing query: {conversionResponse.ErrorMessage}"; - throw new(error); + + public void Bind(scoped TBinder binder) where TBinder : IPreparedQueryBinder, allows ref struct { + // nothing to bind } - } - internal static List> ExecuteAdHocUserQuery(this DuckDBConnectionPool pool, TryGetUserIndexTableDetails tryGetUserIndexTableDetails, string sql) { - var query = AmendQuery(pool, tryGetUserIndexTableDetails, sql); - using var scope = pool.Rent(out var connection); - var items = (IEnumerable>)connection.Query(query); - return items.Select(x => x.ToDictionary(y => y.Key, y => y.Value)).ToList(); - } + public JsonDocument ToJson() => JsonDocument.Parse(_writer.WrittenMemory); - private static string ReplaceSpecialCharsWithUnderscore(string input) - => string.IsNullOrEmpty(input) ? string.Empty : SpecialCharsRegex().Replace(input, "_"); - - [GeneratedRegex(@"\b(?:stream|category|index):([A-Za-z0-9_-]+)\b", RegexOptions.IgnoreCase | RegexOptions.CultureInvariant)] - private static partial Regex ExtractionRegex(); - - [GeneratedRegex("[^A-Za-z0-9_]", RegexOptions.CultureInvariant)] - private static partial Regex SpecialCharsRegex(); - - private static readonly string AllCte = string.Format(AllCteTemplate, "all_events", ""); - - private const string AllCteTemplate = """ - {0} AS ( - select log_position, stream, event_number, event_type, epoch_ms(created) as created_at, event->>'data' as data, event->>'metadata' as metadata - from ( - select *, kdb_get(log_position)::JSON as event - from ( - select stream, event_number, event_type, log_position, created from idx_all {1} - union all - select stream, event_number, event_type, log_position, created from inflight() {1} - ) - ) - ) - """; - - private const string UserIndexCteTemplate = """ - {0} AS ( - select log_position, event->>'stream_id' as stream, event_number, event->>'event_type' as event_type, epoch_ms(created) as created_at, event->>'data' as data, event->>'metadata' as metadata{3} - from ( - select *, kdb_get(log_position)::JSON as event - from ( - select log_position, event_number, created{3} from {1} - union all - select log_position, event_number, created{3} from {2}() - ) - ) - ) - """; - - private record SqlJsonResponse { - [JsonPropertyName("error")] public bool Error { get; init; } - [JsonPropertyName("error_message")] public string ErrorMessage { get; init; } = ""; - [JsonPropertyName("error_subtype")] public string ErrorSubtype { get; init; } = ""; - [JsonPropertyName("position")] public string Position { get; init; } + protected override void Dispose(bool disposing) { + if (disposing) { + _writer.Dispose(); + } + + base.Dispose(disposing); + } } } diff --git a/src/SchemaRegistry/KurrentDB.SchemaRegistry/Modules/Schemas/Data/SchemaDbSchema.cs b/src/SchemaRegistry/KurrentDB.SchemaRegistry/Modules/Schemas/Data/SchemaDbSchema.cs index 740390432ce..a6da0e0a77c 100644 --- a/src/SchemaRegistry/KurrentDB.SchemaRegistry/Modules/Schemas/Data/SchemaDbSchema.cs +++ b/src/SchemaRegistry/KurrentDB.SchemaRegistry/Modules/Schemas/Data/SchemaDbSchema.cs @@ -3,13 +3,14 @@ using Dapper; using DuckDB.NET.Data; +using Kurrent.Quack; using KurrentDB.DuckDB; namespace KurrentDB.SchemaRegistry.Data; [UsedImplicitly] public class SchemaDbSchema : DuckDBOneTimeSetup { - protected override void ExecuteCore(DuckDBConnection connection) { + protected override void ExecuteCore(DuckDBAdvancedConnection connection) { const string createTablesAndIndexesSql = """ CREATE TABLE IF NOT EXISTS schema_versions (