Skip to content

Commit ec10951

Browse files
committed
完成全部查询的链接模式支持,并且针对所有代码进行了优化和单元测试发布x.3.1.67
1 parent 9ec692e commit ec10951

18 files changed

+189
-159
lines changed

nuget-publish.bat

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
:start
22
::定义版本
3-
set EFCORE2=2.3.1.66
4-
set EFCORE3=3.3.1.66
5-
set EFCORE5=5.3.1.66
6-
set EFCORE6=6.3.1.66
3+
set EFCORE2=2.3.1.67
4+
set EFCORE3=3.3.1.67
5+
set EFCORE5=5.3.1.67
6+
set EFCORE6=6.3.1.67
77

88
::删除所有bin与obj下的文件
99
@echo off

src/ShardingCore/Core/ConnectionModeEnum.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ namespace ShardingCore.Core
88
{
99
/// <summary>
1010
/// 链接模式,可以由用户自行指定,使用内存限制,和连接数限制或者系统自行选择最优
11-
/// STREAM_MERGE的意思是最小化内存使用率,就是非一次性获取所有数据然后采用流式聚合
12-
/// IN_MEMORY_MERGE的意思是最小化连接并发数,就是单次查询并发连接数为设置的连接数<see cref="IShardingConfigOption.MaxQueryConnectionsLimit"/>。因为有限制,所以无法一直挂起连接,必须全部获取到内存后进行内存聚合
11+
/// MEMORY_STRICTLY的意思是最小化内存使用率,就是非一次性获取所有数据然后采用流式聚合
12+
/// CONNECTION_STRICTLY的意思是最小化连接并发数,就是单次查询并发连接数为设置的连接数<see cref="IShardingConfigOption.MaxQueryConnectionsLimit"/>。因为有限制,所以无法一直挂起连接,必须全部获取到内存后进行内存聚合
1313
/// 系统自行选择会根据用户的配置采取最小化连接数,但是如果遇到分页则会根据分页策略采取内存限制,因为skip过大会导致内存爆炸
1414
/// </summary>
1515
public enum ConnectionModeEnum
1616
{
17-
//流式聚合 同时会有多个链接
18-
STREAM_MERGE,
19-
//内存聚合 连接数会有限制
20-
IN_MEMORY_MERGE,
17+
//内存限制模式最小化内存聚合 流式聚合 同时会有多个链接
18+
MEMORY_STRICTLY,
19+
//连接数限制模式最小化并发连接数 内存聚合 连接数会有限制
20+
CONNECTION_STRICTLY,
2121
//系统自动选择内存还是流式聚合
2222
SYSTEM_AUTO
2323
}

src/ShardingCore/Extensions/ConnectionModeExtension.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static async Task<T> ReleaseConnectionAsync<T>(this Task<T> executeTask,
1919
}
2020
finally
2121
{
22-
if (connectionMode == ConnectionModeEnum.IN_MEMORY_MERGE)
22+
if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
2323
{
2424
#if !EFCORE2
2525
await dbContext.DisposeAsync();

src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs

Lines changed: 70 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
using System.Threading;
1010
using System.Threading.Tasks;
1111
using ShardingCore.Core;
12+
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
1213
using ShardingCore.Sharding.MergeEngines.Common;
14+
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
1315
using ShardingCore.Sharding.StreamMergeEngines;
1416

1517
namespace ShardingCore.Sharding.MergeEngines.Abstractions
@@ -66,62 +68,16 @@ internal abstract class AbstractBaseMergeEngine<TEntity>
6668

6769
//}
6870

69-
public Task<LinkedList<TResult2>>[] GetDataSourceGroupAndExecutorGroup<TResult,TResult2>(Func<SqlExecutorUnit,Task<TResult2>> sqlExecutorUnitExecuteAsync,CancellationToken cancellationToken=new CancellationToken())
71+
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(IEnumerable<ISqlRouteUnit> sqlRouteUnits,Func<SqlExecutorUnit,Task<TResult>> sqlExecutorUnitExecuteAsync,CancellationToken cancellationToken=new CancellationToken())
7072
{
71-
var streamMergeContext = GetStreamMergeContext();
72-
var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit();
73-
74-
var waitTaskQueue = streamMergeContext.DataSourceRouteResult.IntersectDataSources.SelectMany(
75-
dataSourceName =>
76-
{
77-
return streamMergeContext.TableRouteResults.Select(routeResult =>
78-
new SqlRouteUnit(dataSourceName, routeResult));
79-
}).GroupBy(o => o.DataSourceName).Select(sqlGroups =>
80-
{
81-
var sqlCount = sqlGroups.Count();
82-
//根据用户配置单次查询期望并发数
83-
int exceptCount =
84-
Math.Max(
85-
0 == sqlCount % maxQueryConnectionsLimit
86-
? sqlCount / maxQueryConnectionsLimit
87-
: sqlCount / maxQueryConnectionsLimit + 1, 1);
88-
//计算应该使用那种链接模式
89-
ConnectionModeEnum connectionMode = CalcConnectionMode(streamMergeContext.GetConnectionMode(),
90-
streamMergeContext.GetUseMemoryLimitWhileSkip(), maxQueryConnectionsLimit, sqlCount,
91-
streamMergeContext.Skip);
92-
var sqlExecutorUnitPartitions = sqlGroups
93-
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
94-
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList()).ToList();
95-
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
96-
}).Select(executorGroups =>
73+
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits).Select(GetSqlExecutorGroups).Select(executorGroups =>
9774
{
9875
return Task.Run(async () =>
9976
{
100-
LinkedList<TResult2> result = new LinkedList<TResult2>();
77+
LinkedList<TResult> result = new LinkedList<TResult>();
10178
foreach (var executorGroup in executorGroups)
10279
{
103-
var executorGroupParallelExecuteTasks = executorGroup.Groups.Select(executor =>
104-
{
105-
return Task.Run(async () =>
106-
{
107-
return await sqlExecutorUnitExecuteAsync(executor);
108-
//var dataSourceName = executor.RouteUnit.DataSourceName;
109-
//var routeResult = executor.RouteUnit.TableRouteResult;
110-
111-
//var asyncExecuteQueryable =
112-
// CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
113-
114-
115-
//var queryResult = await efQuery(asyncExecuteQueryable);
116-
117-
//return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
118-
//return await AsyncParallelResultExecute(asyncExecuteQueryable, dataSourceName,
119-
// routeResult, efQuery,
120-
// cancellationToken);
121-
122-
}, cancellationToken);
123-
}).ToArray();
124-
var routeQueryResults = (await Task.WhenAll(executorGroupParallelExecuteTasks)).ToList();
80+
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync,cancellationToken);
12581
foreach (var routeQueryResult in routeQueryResults)
12682
{
12783
result.AddLast(routeQueryResult);
@@ -134,25 +90,78 @@ public Task<LinkedList<TResult2>>[] GetDataSourceGroupAndExecutorGroup<TResult,T
13490
return waitTaskQueue;
13591
}
13692

93+
protected virtual IEnumerable<ISqlRouteUnit> GetDefaultSqlRouteUnits()
94+
{
95+
96+
var streamMergeContext = GetStreamMergeContext();
97+
98+
return streamMergeContext.DataSourceRouteResult.IntersectDataSources.SelectMany(
99+
dataSourceName =>
100+
{
101+
return streamMergeContext.TableRouteResults.Select(routeResult =>
102+
new SqlRouteUnit(dataSourceName, routeResult));
103+
});
104+
}
105+
protected virtual IEnumerable<IGrouping<string, ISqlRouteUnit>> AggregateQueryByDataSourceName(IEnumerable<ISqlRouteUnit> sqlRouteUnits)
106+
{
107+
return sqlRouteUnits.GroupBy(o => o.DataSourceName);
108+
}
137109

110+
protected List<SqlExecutorGroup<SqlExecutorUnit>> GetSqlExecutorGroups(IGrouping<string, ISqlRouteUnit> sqlGroups)
111+
{
112+
var streamMergeContext = GetStreamMergeContext();
113+
var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit();
114+
var sqlCount = sqlGroups.Count();
115+
//根据用户配置单次查询期望并发数
116+
int exceptCount =
117+
Math.Max(
118+
0 == sqlCount % maxQueryConnectionsLimit
119+
? sqlCount / maxQueryConnectionsLimit
120+
: sqlCount / maxQueryConnectionsLimit + 1, 1);
121+
//计算应该使用那种链接模式
122+
ConnectionModeEnum connectionMode = streamMergeContext.GetConnectionMode(sqlCount);
123+
var sqlExecutorUnitPartitions = sqlGroups
124+
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
125+
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList()).ToList();
126+
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
127+
}
138128

139-
protected ConnectionModeEnum CalcConnectionMode(ConnectionModeEnum currentConnectionMode, int useMemoryLimitWhileSkip, int maxQueryConnectionsLimit, int sqlCount,int? skip)
129+
protected async Task<LinkedList<TResult>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<TResult>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
140130
{
141-
switch (currentConnectionMode)
131+
if (sqlExecutorUnits.Count <= 0)
132+
{
133+
return new LinkedList<TResult>();
134+
}
135+
else
142136
{
143-
case ConnectionModeEnum.STREAM_MERGE:
144-
case ConnectionModeEnum.IN_MEMORY_MERGE: return currentConnectionMode;
145-
default:
137+
var result=new LinkedList<TResult>();
138+
Task<TResult>[] tasks=null;
139+
if (sqlExecutorUnits.Count > 1)
146140
{
147-
if (skip.HasValue && skip.Value > useMemoryLimitWhileSkip)
141+
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
148142
{
149-
return ConnectionModeEnum.STREAM_MERGE;
150-
}
151-
return maxQueryConnectionsLimit < sqlCount
152-
? ConnectionModeEnum.IN_MEMORY_MERGE
153-
: ConnectionModeEnum.STREAM_MERGE; ;
143+
return Task.Run(async () =>
144+
{
145+
return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit);
146+
147+
}, cancellationToken);
148+
}).ToArray();
149+
}
150+
else
151+
{
152+
tasks = Array.Empty<Task<TResult>>();
153+
}
154+
var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]);
155+
result.AddLast(firstResult);
156+
var otherResults = await Task.WhenAll(tasks);
157+
foreach (var otherResult in otherResults)
158+
{
159+
result.AddLast(otherResult);
154160
}
161+
162+
return result;
155163
}
164+
156165
}
157166
}
158167
}

src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpressio
8585

8686
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
8787
{
88-
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<TResult, RouteQueryResult<TResult>>(
88+
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
89+
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(defaultSqlRouteUnits,
8990
async sqlExecutorUnit =>
9091
{
9192
var connectionMode = _mergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);

src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(
114114
if (async)
115115
{
116116
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
117-
return connectionMode==ConnectionModeEnum.STREAM_MERGE? new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
117+
return connectionMode==ConnectionModeEnum.MEMORY_STRICTLY? new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
118118
}
119119
else
120120
{
121121
var enumerator = GetEnumerator0(queryable);
122-
return connectionMode == ConnectionModeEnum.STREAM_MERGE ? new StreamMergeAsyncEnumerator<TEntity>(enumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(enumerator);
122+
return connectionMode == ConnectionModeEnum.MEMORY_STRICTLY ? new StreamMergeAsyncEnumerator<TEntity>(enumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(enumerator);
123123
}
124124
}
125125

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
7+
8+
namespace ShardingCore.Sharding.MergeEngines.Common.Abstractions
9+
{
10+
internal interface ISqlRouteUnit
11+
{
12+
string DataSourceName { get; }
13+
TableRouteResult TableRouteResult { get; }
14+
}
15+
}

src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorGroup.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace ShardingCore.Sharding.MergeEngines.Common
88
{
9-
public sealed class SqlExecutorGroup<T>
9+
internal sealed class SqlExecutorGroup<T>
1010
{
1111
public SqlExecutorGroup(List<T> groups)
1212
{

src/ShardingCore/Sharding/MergeEngines/Common/SqlExecutorUnit.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@
44
using System.Text;
55
using System.Threading.Tasks;
66
using ShardingCore.Core;
7+
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
78

89
namespace ShardingCore.Sharding.MergeEngines.Common
910
{
10-
public class SqlExecutorUnit
11+
internal class SqlExecutorUnit
1112
{
1213

13-
public SqlExecutorUnit(ConnectionModeEnum connectionMode, SqlRouteUnit routeUnit)
14+
public SqlExecutorUnit(ConnectionModeEnum connectionMode, ISqlRouteUnit routeUnit)
1415
{
1516
ConnectionMode = connectionMode;
1617
RouteUnit = routeUnit;
1718
}
1819

19-
public SqlRouteUnit RouteUnit { get; }
20+
public ISqlRouteUnit RouteUnit { get; }
2021
public ConnectionModeEnum ConnectionMode { get; }
2122
}
2223
}

src/ShardingCore/Sharding/MergeEngines/Common/SqlRouteUnit.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
using System.Text;
55
using System.Threading.Tasks;
66
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
7+
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
78

89
namespace ShardingCore.Sharding.MergeEngines.Common
910
{
10-
public sealed class SqlRouteUnit
11+
internal class SqlRouteUnit: ISqlRouteUnit
1112
{
13+
1214
public SqlRouteUnit(string dataSourceName, TableRouteResult tableRouteResult)
1315
{
1416
DataSourceName = dataSourceName;

0 commit comments

Comments
 (0)