@@ -30,9 +30,9 @@ namespace ShardingCore.Sharding
3030 * @Date: Monday, 25 January 2021 11:38:27
31313232 */
33- public class StreamMergeContext < TEntity > : IDisposable
33+ public class StreamMergeContext < TEntity > : IDisposable
3434#if ! EFCORE2
35- , IAsyncDisposable
35+ , IAsyncDisposable
3636#endif
3737 {
3838 //private readonly IShardingScopeFactory _shardingScopeFactory;
@@ -89,15 +89,8 @@ public StreamMergeContext(IQueryable<TEntity> source, IShardingDbContext shardin
8989 _routeTailFactory = routeTailFactory ;
9090 DataSourceRouteResult = dataSourceRouteResult ;
9191 _parallelTableManager = ( IParallelTableManager ) ShardingContainer . GetService ( typeof ( IParallelTableManager < > ) . GetGenericType0 ( shardingDbContext . GetType ( ) ) ) ;
92- _entityMetadataManager = ( IEntityMetadataManager ) ShardingContainer . GetService ( typeof ( IEntityMetadataManager < > ) . GetGenericType0 ( shardingDbContext . GetType ( ) ) ) ;
93- if ( _parallelTableManager . IsParallelTableQuery ( QueryEntities . Where ( o=> _entityMetadataManager . IsShardingTable ( o ) ) ) )
94- {
95- TableRouteResults = tableRouteResults . Where ( o=> o . ReplaceTables . Select ( p => p . Tail ) . ToHashSet ( ) . Count == 1 ) ;
96- }
97- else
98- {
99- TableRouteResults = tableRouteResults ;
100- }
92+
93+ TableRouteResults = GetTableRouteResults ( tableRouteResults ) ;
10194 IsCrossDataSource = dataSourceRouteResult . IntersectDataSources . Count > 1 ;
10295 IsCrossTable = TableRouteResults . Count ( ) > 1 ;
10396 var reWriteResult = new ReWriteEngine < TEntity > ( source ) . ReWrite ( ) ;
@@ -118,6 +111,19 @@ public StreamMergeContext(IQueryable<TEntity> source, IShardingDbContext shardin
118111 _parallelDbContexts = new ConcurrentDictionary < DbContext , object > ( ) ;
119112 //RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
120113 }
114+
115+ private IEnumerable < TableRouteResult > GetTableRouteResults ( IEnumerable < TableRouteResult > tableRouteResults )
116+ {
117+ if ( QueryEntities . Count > 1 )
118+ {
119+ var queryShardingTables = QueryEntities . Where ( o => _entityMetadataManager . IsShardingTable ( o ) ) . ToArray ( ) ;
120+ if ( queryShardingTables . Length > 1 && _parallelTableManager . IsParallelTableQuery ( queryShardingTables ) )
121+ {
122+ return tableRouteResults . Where ( o => o . ReplaceTables . Select ( p => p . Tail ) . ToHashSet ( ) . Count == 1 ) ;
123+ }
124+ }
125+ return tableRouteResults ;
126+ }
121127 //public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults,
122128 // IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
123129 //{
@@ -241,11 +247,11 @@ private ConnectionModeEnum CalcConnectionMode(int sqlCount)
241247 case ConnectionModeEnum . MEMORY_STRICTLY :
242248 case ConnectionModeEnum . CONNECTION_STRICTLY : return _shardingConfigOption . ConnectionMode ;
243249 default :
244- {
245- return _shardingConfigOption . MaxQueryConnectionsLimit < sqlCount
246- ? ConnectionModeEnum . CONNECTION_STRICTLY
247- : ConnectionModeEnum . MEMORY_STRICTLY ; ;
248- }
250+ {
251+ return _shardingConfigOption . MaxQueryConnectionsLimit < sqlCount
252+ ? ConnectionModeEnum . CONNECTION_STRICTLY
253+ : ConnectionModeEnum . MEMORY_STRICTLY ; ;
254+ }
249255 }
250256 }
251257 /// <summary>
@@ -271,7 +277,7 @@ private bool IsUseReadWriteSeparation()
271277 /// <returns></returns>
272278 public bool IsParallelQuery ( )
273279 {
274- return ! _shardingConfigOption . AutoTrackEntity || IsCrossQuery ( ) || IsUseReadWriteSeparation ( ) ;
280+ return ! _shardingConfigOption . AutoTrackEntity || IsCrossQuery ( ) || IsUseReadWriteSeparation ( ) ;
275281 }
276282
277283 /// <summary>
0 commit comments