77using System . Threading . Tasks ;
88using Microsoft . EntityFrameworkCore ;
99using Microsoft . EntityFrameworkCore . Infrastructure ;
10+ using Microsoft . Extensions . Logging . Abstractions ;
1011using ShardingCore . Exceptions ;
1112using ShardingCore . Extensions ;
1213using ShardingCore . Helpers ;
1314using ShardingCore . Sharding . Abstractions ;
1415using ShardingCore . Sharding . MergeEngines . Abstractions . InMemoryMerge . AbstractEnsureMergeEngines ;
16+ using ShardingCore . Sharding . MergeEngines . AggregateMergeEngines ;
1517
1618namespace ShardingCore . Sharding . StreamMergeEngines . AggregateMergeEngines
1719{
@@ -22,8 +24,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
2224 * @Ver: 1.0
23252426 */
25- internal class AverageAsyncInMemoryMergeEngine < TEntity , TEnsureResult > :
26- AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine < TEntity , TEnsureResult , TEnsureResult >
27+ internal class AverageAsyncInMemoryMergeEngine < TEntity , TEnsureResult , TSelect > :
28+ AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine < TEntity , TEnsureResult , TSelect >
2729 {
2830 public AverageAsyncInMemoryMergeEngine ( MethodCallExpression methodCallExpression ,
2931 IShardingDbContext shardingDbContext ) : base ( methodCallExpression , shardingDbContext )
@@ -35,120 +37,192 @@ public override TEnsureResult MergeResult()
3537 return AsyncHelper . RunSync ( ( ) => MergeResultAsync ( ) ) ;
3638 }
3739
40+ private async Task < List < RouteQueryResult < AverageResult < T > > > > AggregateAverageResultAsync < T > ( CancellationToken cancellationToken = new CancellationToken ( ) )
41+ {
42+ return ( await base . ExecuteAsync (
43+ async queryable =>
44+ {
45+ var count = await ( ( IQueryable < T > ) queryable ) . LongCountAsync ( cancellationToken ) ;
46+ if ( count <= 0 )
47+ {
48+ return default ;
49+ }
50+
51+ var sum = await GetSumAsync < T > ( queryable , cancellationToken ) ;
52+ return new AverageResult < T > ( sum , count ) ;
53+
54+ } ,
55+ cancellationToken ) ) . Where ( o => o . QueryResult != null ) . ToList ( ) ;
56+ }
57+
58+ private async Task < T > GetSumAsync < T > ( IQueryable queryable ,
59+ CancellationToken cancellationToken = new CancellationToken ( ) )
60+ {
61+
62+ if ( typeof ( decimal ) == typeof ( T ) )
63+ {
64+ var sum = await ( ( IQueryable < decimal > ) queryable ) . SumAsync ( cancellationToken ) ;
65+ return ConvertSum < T , decimal > ( sum ) ;
66+ }
67+
68+ if ( typeof ( decimal ? ) == typeof ( T ) )
69+ {
70+ var sum = await ( ( IQueryable < decimal ? > ) queryable ) . SumAsync ( cancellationToken ) ;
71+ return ConvertSum < T , decimal ? > ( sum ) ;
72+ }
73+
74+ if ( typeof ( int ) == typeof ( T ) )
75+ {
76+ var sum = await ( ( IQueryable < int > ) queryable ) . SumAsync ( cancellationToken ) ;
77+ return ConvertSum < T , int > ( sum ) ;
78+ }
79+
80+ if ( typeof ( int ? ) == typeof ( T ) )
81+ {
82+ var sum = await ( ( IQueryable < int ? > ) queryable ) . SumAsync ( cancellationToken ) ;
83+ return ConvertSum < T , int ? > ( sum ) ;
84+ }
85+
86+ if ( typeof ( long ) == typeof ( T ) )
87+ {
88+ var sum = await ( ( IQueryable < long > ) queryable ) . SumAsync ( cancellationToken ) ;
89+ return ConvertSum < T , long > ( sum ) ;
90+ }
91+
92+ if ( typeof ( long ? ) == typeof ( T ) )
93+ {
94+ var sum = await ( ( IQueryable < long ? > ) queryable ) . SumAsync ( cancellationToken ) ;
95+ return ConvertSum < T , long ? > ( sum ) ;
96+ }
97+
98+ if ( typeof ( double ) == typeof ( T ) )
99+ {
100+ var sum = await ( ( IQueryable < double > ) queryable ) . SumAsync ( cancellationToken ) ;
101+ return ConvertSum < T , double > ( sum ) ;
102+ }
103+
104+ if ( typeof ( double ? ) == typeof ( T ) )
105+ {
106+ var sum = await ( ( IQueryable < double ? > ) queryable ) . SumAsync ( cancellationToken ) ;
107+ return ConvertSum < T , double ? > ( sum ) ;
108+ }
109+
110+ if ( typeof ( float ) == typeof ( T ) )
111+ {
112+ var sum = await ( ( IQueryable < float > ) queryable ) . SumAsync ( cancellationToken ) ;
113+ return ConvertSum < T , float > ( sum ) ;
114+ }
115+
116+ if ( typeof ( float ? ) == typeof ( T ) )
117+ {
118+ var sum = await ( ( IQueryable < float ? > ) queryable ) . SumAsync ( cancellationToken ) ;
119+ return ConvertSum < T , float ? > ( sum ) ;
120+ }
121+
122+ throw new ShardingCoreException (
123+ $ "not support { GetMethodCallExpression ( ) . ShardingPrint ( ) } result { typeof ( T ) } cant call sum method") ;
124+ }
38125 public override async Task < TEnsureResult > MergeResultAsync (
39126 CancellationToken cancellationToken = new CancellationToken ( ) )
40127 {
41- if ( typeof ( decimal ) == typeof ( TEnsureResult ) )
128+ if ( typeof ( decimal ) == typeof ( TSelect ) )
42129 {
43- var result = await base . ExecuteAsync (
44- queryable => ( ( IQueryable < decimal > ) queryable ) . AverageAsync ( cancellationToken ) ,
45- cancellationToken ) ;
130+ var result = await AggregateAverageResultAsync < decimal > ( cancellationToken ) ;
46131 if ( result . IsEmpty ( ) )
47- return default ;
48- var average = result . Sum ( o=> o . QueryResult ) / result . Count ;
49- return ConvertSum ( average ) ;
132+ throw new InvalidOperationException ( "Sequence contains no elements." ) ;
133+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
134+ var count = result . Sum ( o => o . QueryResult . Count ) ;
135+ return ConvertSum ( sum / count ) ;
50136 }
51137
52- if ( typeof ( decimal ? ) == typeof ( TEnsureResult ) )
138+ if ( typeof ( decimal ? ) == typeof ( TSelect ) )
53139 {
54- var result = await base . ExecuteAsync (
55- queryable => ( ( IQueryable < decimal ? > ) queryable ) . AverageAsync ( cancellationToken ) ,
56- cancellationToken ) ;
140+ var result = await AggregateAverageResultAsync < decimal ? > ( cancellationToken ) ;
57141 if ( result . IsEmpty ( ) )
58142 return default ;
59- var sum = result . Sum ( o => o . QueryResult ) ;
60- var average = sum . HasValue ? sum / result . Count : default ;
61- return ConvertSum ( average ) ;
143+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
144+ var count = result . Sum ( o => o . QueryResult . Count ) ;
145+ return ConvertSum ( sum / count ) ;
62146 }
63147
64- if ( typeof ( int ) == typeof ( TEnsureResult ) )
148+ if ( typeof ( int ) == typeof ( TSelect ) )
65149 {
66- var result = await base . ExecuteAsync (
67- queryable => ( ( IQueryable < int > ) queryable ) . AverageAsync ( cancellationToken ) ,
68- cancellationToken ) ;
150+ var result = await AggregateAverageResultAsync < int > ( cancellationToken ) ;
69151 if ( result . IsEmpty ( ) )
70- return default ;
71- var average = result . Sum ( o => o . QueryResult ) / result . Count ;
72- return ConvertSum ( average ) ;
152+ throw new InvalidOperationException ( "Sequence contains no elements." ) ;
153+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
154+ var count = result . Sum ( o => o . QueryResult . Count ) ;
155+ return ConvertSum ( sum / count ) ;
73156 }
74157
75- if ( typeof ( int ? ) == typeof ( TEnsureResult ) )
158+ if ( typeof ( int ? ) == typeof ( TSelect ) )
76159 {
77- var result = await base . ExecuteAsync (
78- queryable => ( ( IQueryable < int ? > ) queryable ) . AverageAsync ( cancellationToken ) ,
79- cancellationToken ) ;
160+ var result = await AggregateAverageResultAsync < int ? > ( cancellationToken ) ;
80161 if ( result . IsEmpty ( ) )
81162 return default ;
82- var sum = result . Sum ( o => o . QueryResult ) ;
83- var average = sum . HasValue ? sum / result . Count : default ;
84- return ConvertSum ( average ) ;
163+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
164+ var count = result . Sum ( o => o . QueryResult . Count ) ;
165+ return ConvertSum ( sum / count ) ;
85166 }
86167
87- if ( typeof ( long ) == typeof ( TEnsureResult ) )
168+ if ( typeof ( long ) == typeof ( TSelect ) )
88169 {
89- var result = await base . ExecuteAsync (
90- queryable => ( ( IQueryable < long > ) queryable ) . AverageAsync ( cancellationToken ) ,
91- cancellationToken ) ;
170+ var result = await AggregateAverageResultAsync < long > ( cancellationToken ) ;
92171 if ( result . IsEmpty ( ) )
93- return default ;
94- var average = result . Sum ( o => o . QueryResult ) / result . Count ;
95- return ConvertSum ( average ) ;
172+ throw new InvalidOperationException ( "Sequence contains no elements." ) ;
173+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
174+ var count = result . Sum ( o => o . QueryResult . Count ) ;
175+ return ConvertSum ( sum / count ) ;
96176 }
97177
98- if ( typeof ( long ? ) == typeof ( TEnsureResult ) )
178+ if ( typeof ( long ? ) == typeof ( TSelect ) )
99179 {
100- var result = await base . ExecuteAsync (
101- queryable => ( ( IQueryable < long ? > ) queryable ) . AverageAsync ( cancellationToken ) ,
102- cancellationToken ) ;
180+ var result = await AggregateAverageResultAsync < long ? > ( cancellationToken ) ;
103181 if ( result . IsEmpty ( ) )
104182 return default ;
105- var sum = result . Sum ( o => o . QueryResult ) ;
106- var average = sum . HasValue ? sum / result . Count : default ;
107- return ConvertSum ( average ) ;
183+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
184+ var count = result . Sum ( o => o . QueryResult . Count ) ;
185+ return ConvertSum ( sum / count ) ;
108186 }
109187
110- if ( typeof ( double ) == typeof ( TEnsureResult ) )
188+ if ( typeof ( double ) == typeof ( TSelect ) )
111189 {
112- var result = await base . ExecuteAsync (
113- queryable => ( ( IQueryable < double > ) queryable ) . AverageAsync ( cancellationToken ) ,
114- cancellationToken ) ;
115- var average = result . Sum ( o => o . QueryResult ) / result . Count ;
116- return ConvertSum ( average ) ;
190+ var result = await AggregateAverageResultAsync < double > ( cancellationToken ) ;
191+ if ( result . IsEmpty ( ) )
192+ throw new InvalidOperationException ( "Sequence contains no elements." ) ;
193+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
194+ var count = result . Sum ( o => o . QueryResult . Count ) ;
195+ return ConvertSum ( sum / count ) ;
117196 }
118197
119- if ( typeof ( double ? ) == typeof ( TEnsureResult ) )
198+ if ( typeof ( double ? ) == typeof ( TSelect ) )
120199 {
121- var result = await base . ExecuteAsync (
122- queryable => ( ( IQueryable < double ? > ) queryable ) . AverageAsync ( cancellationToken ) ,
123- cancellationToken ) ;
200+ var result = await AggregateAverageResultAsync < double ? > ( cancellationToken ) ;
124201 if ( result . IsEmpty ( ) )
125202 return default ;
126- var sum = result . Sum ( o => o . QueryResult ) ;
127- var average = sum . HasValue ? sum / result . Count : default ;
128- return ConvertSum ( average ) ;
203+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
204+ var count = result . Sum ( o => o . QueryResult . Count ) ;
205+ return ConvertSum ( sum / count ) ;
129206 }
130207
131- if ( typeof ( float ) == typeof ( TEnsureResult ) )
208+ if ( typeof ( float ) == typeof ( TSelect ) )
132209 {
133- var result = await base . ExecuteAsync (
134- queryable => ( ( IQueryable < float > ) queryable ) . AverageAsync ( cancellationToken ) ,
135- cancellationToken ) ;
210+ var result = await AggregateAverageResultAsync < float > ( cancellationToken ) ;
136211 if ( result . IsEmpty ( ) )
137- return default ;
138- var average = result . Sum ( o => o . QueryResult ) / result . Count ;
139- return ConvertSum ( average ) ;
212+ throw new InvalidOperationException ( "Sequence contains no elements." ) ;
213+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
214+ var count = result . Sum ( o => o . QueryResult . Count ) ;
215+ return ConvertSum ( sum / count ) ;
140216 }
141217
142- if ( typeof ( float ? ) == typeof ( TEnsureResult ) )
218+ if ( typeof ( float ? ) == typeof ( TSelect ) )
143219 {
144- var result = await base . ExecuteAsync (
145- queryable => ( ( IQueryable < float ? > ) queryable ) . AverageAsync ( cancellationToken ) ,
146- cancellationToken ) ;
220+ var result = await AggregateAverageResultAsync < float ? > ( cancellationToken ) ;
147221 if ( result . IsEmpty ( ) )
148222 return default ;
149- var sum = result . Sum ( o => o . QueryResult ) ;
150- var average = sum . HasValue ? sum / result . Count : default ;
151- return ConvertSum ( average ) ;
223+ var sum = result . Sum ( o => o . QueryResult . Sum ) ;
224+ var count = result . Sum ( o => o . QueryResult . Count ) ;
225+ return ConvertSum ( sum / count ) ;
152226 }
153227
154228 throw new ShardingCoreException (
@@ -162,5 +236,13 @@ private TEnsureResult ConvertSum<TNumber>(TNumber number)
162236 var convertExpr = Expression . Convert ( Expression . Constant ( number ) , typeof ( TEnsureResult ) ) ;
163237 return Expression . Lambda < Func < TEnsureResult > > ( convertExpr ) . Compile ( ) ( ) ;
164238 }
239+
240+ private TSum ConvertSum < TSum , TNumber > ( TNumber number )
241+ {
242+ if ( number == null )
243+ return default ;
244+ var convertExpr = Expression . Convert ( Expression . Constant ( number ) , typeof ( TSum ) ) ;
245+ return Expression . Lambda < Func < TSum > > ( convertExpr ) . Compile ( ) ( ) ;
246+ }
165247 }
166248}
0 commit comments