Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<PackageVersion Include="Kurrent.Surge.Core" Version="1.1.1-alpha.1.60" />
<PackageVersion Include="Kurrent.Surge.DataProtection" Version="1.1.1-alpha.1.57" />
<PackageVersion Include="Kurrent.Surge.DuckDB" Version="1.1.1-alpha.1.57" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.127" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.150" />
<PackageVersion Include="KurrentDB.Client" Version="1.0.0" />
<PackageVersion Include="librdkafka.redist" Version="2.5.0" />
<PackageVersion Include="LruCacheNet" Version="1.2.0" />
Expand Down
10 changes: 5 additions & 5 deletions src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -49,9 +48,10 @@ public DuckDBConnectionPoolLifetime(
_repeated = repeated;

Shared = CreatePool(isReadOnly: false, log: true);
using var connection = Shared.Open();
foreach (var s in once)
s.Execute(connection);
using (Shared.Rent(out var connection)) {
foreach (var s in once)
s.Execute(connection);
}

return;

Expand Down Expand Up @@ -105,6 +105,7 @@ public Task StopAsync(CancellationToken cancellationToken) {

protected override void Dispose(bool disposing) {
if (disposing) {
Shared.Dispose();
if (_tempPath != null) {
try {
File.Delete(_tempPath);
Expand All @@ -118,7 +119,6 @@ protected override void Dispose(bool disposing) {
}

private class ConnectionPoolWithFunctions(string connectionString, IReadOnlyList<IDuckDBSetup> setup) : DuckDBConnectionPool(connectionString) {
[Experimental("DuckDBNET001")]
protected override void Initialize(DuckDBAdvancedConnection connection) {
base.Initialize(connection);
for (var i = 0; i < setup.Count; i++) {
Expand Down
1 change: 0 additions & 1 deletion src/KurrentDB.Core/DuckDB/InjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public static class InjectionExtensions {
public static IServiceCollection AddDuckDb(this IServiceCollection services) {
services.AddSingleton<DuckDBConnectionPoolLifetime>();
services.AddHostedService(sp => sp.GetRequiredService<DuckDBConnectionPoolLifetime>());
services.AddDuckDBSetup<KdbGetEventSetup>();
services.AddSingleton<DuckDBConnectionPool>(sp => sp.GetRequiredService<DuckDBConnectionPoolLifetime>().Shared);
services.AddSingleton<DuckDbConnectionPoolMiddleware>();
return services;
Expand Down
73 changes: 0 additions & 73 deletions src/KurrentDB.Core/DuckDB/InlineFunctions.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ public void Handle(ClientMessage.TransactionCommit message) {


public void Handle(SystemMessage.StateChangeMessage message) {

// this is now a compile error. probably change 'or' to 'and not', but needs to be thought through.
// TODO: think it through (ticket DB-1957)
#pragma warning disable CS9336 // The pattern is redundant.
if (_nodeState == VNodeState.Leader && message.State is not VNodeState.Leader or VNodeState.ResigningLeader) {
#pragma warning restore CS9336 // The pattern is redundant.
var keys = _currentRequests.Keys;
foreach (var key in keys) {
if (_currentRequests.Remove(key, out var manager)) {
Expand Down
7 changes: 4 additions & 3 deletions src/KurrentDB.DuckDB/DuckDBSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@

using DotNext.Threading;
using DuckDB.NET.Data;
using Kurrent.Quack;

namespace KurrentDB.DuckDB;

public interface IDuckDBSetup {
void Execute(DuckDBConnection connection);
void Execute(DuckDBAdvancedConnection connection);
bool OneTimeOnly { get; }
}

public abstract class DuckDBOneTimeSetup : IDuckDBSetup {
private Atomic.Boolean _created;

public void Execute(DuckDBConnection connection) {
public void Execute(DuckDBAdvancedConnection connection) {
if (!_created.FalseToTrue()) {
return;
}
Expand All @@ -23,5 +24,5 @@ public void Execute(DuckDBConnection connection) {

public bool OneTimeOnly => true;

protected abstract void ExecuteCore(DuckDBConnection connection);
protected abstract void ExecuteCore(DuckDBAdvancedConnection connection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Kurrent.Quack;
using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.SecondaryIndexing.LoadTesting.Appenders;
using KurrentDB.SecondaryIndexing.Storage;
using KurrentDB.SecondaryIndexing.Tests.Generators;
Expand All @@ -24,8 +25,11 @@ public class RawQuackMessageBatchAppender : IMessageBatchAppender {

public RawQuackMessageBatchAppender(DuckDBConnectionPool db, DuckDbTestEnvironmentOptions options) {
_commitSize = options.CommitSize;
var schema = new IndexingDbSchema();
schema.CreateSchema(db);
var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty<ReadResponse>().GetEnumerator());

using (db.Rent(out var rentedConn)) {
schema.Execute(rentedConn);
}

using var connection = db.Open();
_defaultIndexAppender = new(connection, "idx_all"u8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

using Kurrent.Quack.ConnectionPool;
using KurrentDB.Core.Index.Hashes;
using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.Core.Tests.Fakes;
using KurrentDB.SecondaryIndexing.Indexes;
using KurrentDB.SecondaryIndexing.Indexes.Default;
using KurrentDB.SecondaryIndexing.LoadTesting.Appenders;
using KurrentDB.SecondaryIndexing.Storage;
Expand All @@ -23,13 +23,14 @@ public IndexMessageBatchAppender(DuckDBConnectionPool db, int commitSize) {
_commitSize = commitSize;
ReadIndexStub.Build();
var hasher = new CompositeHasher<string>(new XXHashUnsafe(), new Murmur3AUnsafe());
var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitSize });

var publisher = new FakePublisher();
var schema = new IndexingDbSchema();
schema.CreateSchema(db);
var schema = new IndexingDbSchema(static (_, _) => Enumerable.Empty<ReadResponse>().GetEnumerator());
using (db.Rent(out var connection)) {
schema.Execute(connection);
}

_processor = new(db, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance);
_processor = new(db, publisher, hasher, new("test"), NullLoggerFactory.Instance);
}

public ValueTask Append(TestMessageBatch batch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ byte[] data
eventType
);

return ResolvedEvent.ForUnresolvedEvent(record, 0);
return ResolvedEvent.ForUnresolvedEvent(record, 0L);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System.Security.Claims;
using Kurrent.Quack.ConnectionPool;
using Kurrent.Quack.Threading;
using KurrentDB.Core.DuckDB;
using KurrentDB.Core.Services.Transport.Enumerators;
using KurrentDB.Core.XUnit.Tests;
using KurrentDB.SecondaryIndexing.Storage;

Expand All @@ -14,8 +18,19 @@ protected DuckDbIntegrationTest() {
var dbPath = Fixture.GetFilePathFor($"{GetType().Name}.db");

DuckDb = new($"Data Source={dbPath};");
var schema = new IndexingDbSchema();
schema.CreateSchema(DuckDb);
var schema = new IndexingDbSchema(GetEvents);
using (DuckDb.Rent(out var connection)) {
schema.Execute(connection);
}
}

private static IEnumerator<ReadResponse> GetEvents(long[] logPositions, ClaimsPrincipal user) {
// This is stub method for tests
for (var i = 0; i < logPositions.Length; i++) {
// See GetDatabaseEventsFunction implementation,
// for any unexpected response the function generates a row with empty values
yield return new ReadResponse.StreamNotFound(string.Empty);
}
}

public override async ValueTask DisposeAsync() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using Dapper;
using DotNext;
using Kurrent.Quack.ConnectionPool;
using Kurrent.Quack;
using KurrentDB.Core.Data;
using KurrentDB.Core.Index.Hashes;
using KurrentDB.Core.Tests.Fakes;
Expand Down Expand Up @@ -117,44 +116,88 @@ public void CommittedMultipleEventsToAStringWithoutCategory_AreIndexed() {
}

private void AssertDefaultIndexQueryReturns(List<long> expected) {
var records =
DuckDb.QueryToList<ReadDefaultIndexQueryArgs, IndexQueryRecord, ReadDefaultIndexQueryExcl>(new(-1, long.MaxValue,
int.MaxValue));
List<IndexQueryRecord> records;
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
records = connection
.ExecuteQuery<ReadDefaultIndexQueryArgs, IndexQueryRecord, ReadDefaultIndexQueryExcl>(new(-1, int.MaxValue))
.ToList();
}
}

Assert.Equal(expected, records.Select(x => x.LogPosition));
}

private void AssertLastLogPositionQueryReturns(long? expectedLogPosition) {
var actual = DuckDb.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>().OrNull();
LastPositionResult? actual;
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
actual = connection.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>().OrNull();
}
}

Assert.Equal(expectedLogPosition, actual?.PreparePosition);
}

private void AssertGetCategoriesQueryReturns(string[] expected) {
using var connection = DuckDb.Open();
var records = connection.Query<string>("select distinct category from idx_all order by log_position");
var records = new List<string>();
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
using var result = connection.ExecuteAdHocQuery("select distinct category from idx_all_snapshot order by log_position"u8);
while (result.TryFetch(out var chunk)) {
using (chunk) {
while (chunk.TryRead(out var row)) {
records.Add(row.ReadString());
}
}
}
}
}

Assert.Equal(expected, records);
}

private void AssertCategoryIndexQueryReturns(string category, List<long> expected) {
var records =
DuckDb.QueryToList<CategoryIndexQueryArgs, IndexQueryRecord, CategoryIndexQueryIncl>(new(category, 0, long.MaxValue, 32));
List<IndexQueryRecord> records;
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
records = connection
.ExecuteQuery<CategoryIndexQueryArgs, IndexQueryRecord, CategoryIndexQueryIncl>(new(category, 0, 32))
.ToList();
}
}

Assert.Equal(expected, records.Select(x => x.LogPosition));
}

private void AssertGetAllEventTypesQueryReturns(string[] expected) {
using var connection = DuckDb.Open();
var records = connection.Query<string>("select distinct event_type from idx_all order by log_position");
var records = new List<string>();
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
using var result = connection.ExecuteAdHocQuery("select distinct event_type from idx_all order by log_position"u8);
while (result.TryFetch(out var chunk)) {
using (chunk) {
while (chunk.TryRead(out var row)) {
records.Add(row.ReadString());
}
}
}
}
}

Assert.Equal(expected, records);
}

private void AssertReadEventTypeIndexQueryReturns(string eventType, List<long> expected) {
var records = DuckDb.QueryToList<ReadEventTypeIndexQueryArgs, IndexQueryRecord, ReadEventTypeIndexQueryIncl>(
new(eventType, 0, long.MaxValue, 32)
);
List<IndexQueryRecord> records;
using (DuckDb.Rent(out var connection)) {
using (_processor.CaptureSnapshot(connection)) {
records = connection.ExecuteQuery<ReadEventTypeIndexQueryArgs, IndexQueryRecord, ReadEventTypeIndexQueryIncl>(
new(eventType, 0, 32)
)
.ToList();
}
}

Assert.Equal(expected, records.Select(x => x.LogPosition));
}
Expand All @@ -164,13 +207,11 @@ private void AssertReadEventTypeIndexQueryReturns(string eventType, List<long> e
public DefaultIndexProcessorTests() {
ReadIndexStub.Build();

const int commitBatchSize = 9;
var hasher = new CompositeHasher<string>(new XXHashUnsafe(), new Murmur3AUnsafe());
var inflightRecordsCache = new DefaultIndexInFlightRecords(new() { CommitBatchSize = commitBatchSize });

var publisher = new FakePublisher();

_processor = new(DuckDb, inflightRecordsCache, publisher, hasher, new("test"), NullLoggerFactory.Instance);
_processor = new(DuckDb, publisher, hasher, new("test"), NullLoggerFactory.Instance);
}

public override ValueTask DisposeAsync() {
Expand Down
Loading
Loading