@@ -110,8 +110,8 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
110110 if (fCount == 0 )
111111 return ;
112112
113- std::cout << " LimitedOrderBy::processRow row " << row.toString () << std::endl;
114- std::cout << " LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
113+ // std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl;
114+ // std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
115115 auto & orderedRowsQueue = getQueue ();
116116 // if the row count is less than the limit
117117 if (orderedRowsQueue.size () < fStart + fCount )
@@ -162,90 +162,8 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
162162 {
163163 OrderByRow swapRow = orderedRowsQueue.top ();
164164 row1.setData (swapRow.fData );
165- std::cout << " LimitedOrderBy::processRow row2swap " << row1.toString () << std::endl;
166- std::cout << " LimitedOrderBy::processRow new row 4 swaping " << row.toString () << std::endl;
167-
168- copyRow (row, &row1);
169-
170- if (fDistinct )
171- {
172- fDistinctMap ->erase (orderedRowsQueue.top ().fData );
173- fDistinctMap ->insert (row1.getPointer ());
174- }
175-
176- orderedRowsQueue.pop ();
177- orderedRowsQueue.push (swapRow);
178- }
179- }
180-
181- void LimitedOrderBy::processRow_ (const rowgroup::Row& row)
182- {
183- // check if this is a distinct row
184- if (fDistinct && fDistinctMap ->find (row.getPointer ()) != fDistinctMap ->end ())
185- return ;
186-
187- // @bug5312, limit count is 0, do nothing.
188- if (fCount == 0 )
189- return ;
190-
191- // TODO copy rules or replace ptrs to real instances in CompareRules
192- // auto invertedRule = fRule;
193- // invertedRule.revertRules();
194-
195- std::cout << " LimitedOrderBy::processRow row " << row.toString () << std::endl;
196- std::cout << " LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
197- auto & orderedRowsQueue = getQueue ();
198- // if the row count is less than the limit
199- if (orderedRowsQueue.size () < fStart + fCount )
200- {
201- copyRow (row, &fRow0 );
202- OrderByRow newRow (fRow0 , fRule );
203- orderedRowsQueue.push (newRow);
204-
205- uint64_t memSizeInc = sizeof (newRow);
206- fUncommitedMemory += memSizeInc;
207- if (fUncommitedMemory >= fMaxUncommited )
208- {
209- if (!fRm ->getMemory (fUncommitedMemory , fSessionMemLimit ))
210- {
211- cerr << IDBErrorInfo::instance ()->errorMsg (fErrorCode ) << " @" << __FILE__ << " :" << __LINE__;
212- throw IDBExcept (fErrorCode );
213- }
214- fMemSize += fUncommitedMemory ;
215- fUncommitedMemory = 0 ;
216- }
217-
218- // add to the distinct map
219- if (fDistinct )
220- fDistinctMap ->insert (fRow0 .getPointer ());
221-
222- fRowGroup .incRowCount ();
223- fRow0 .nextRow ();
224-
225- if (fRowGroup .getRowCount () >= fRowsPerRG )
226- {
227- fDataQueue .push (fData );
228- uint64_t newSize = fRowGroup .getSizeWithStrings () - fRowGroup .getHeaderSize ();
229-
230- if (!fRm ->getMemory (newSize, fSessionMemLimit ))
231- {
232- cerr << IDBErrorInfo::instance ()->errorMsg (fErrorCode ) << " @" << __FILE__ << " :" << __LINE__;
233- throw IDBExcept (fErrorCode );
234- }
235- fMemSize += newSize;
236-
237- fData .reinit (fRowGroup , fRowsPerRG );
238- fRowGroup .setData (&fData );
239- fRowGroup .resetRowGroup (0 );
240- fRowGroup .getRow (0 , &fRow0 );
241- }
242- }
243- else if (fOrderByCond .size () > 0 && fRule .less (row.getPointer (), orderedRowsQueue.top ().fData ))
244- {
245- OrderByRow swapRow = orderedRowsQueue.top ();
246- row1.setData (swapRow.fData );
247- std::cout << " LimitedOrderBy::processRow row2swap " << row1.toString () << std::endl;
248- std::cout << " LimitedOrderBy::processRow new row 4 swaping " << row.toString () << std::endl;
165+ // std::cout << "LimitedOrderBy::processRow row2swap " << row1.toString() << std::endl;
166+ // std::cout << "LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl;
249167
250168 copyRow (row, &row1);
251169
@@ -277,118 +195,93 @@ void LimitedOrderBy::processRow_(const rowgroup::Row& row)
277195// }
278196// }
279197
280- void LimitedOrderBy::brandNewFinalize ( )
198+ void LimitedOrderBy::flushCurrentToDisk_ ( const bool firstFlush )
281199{
282- if (!isDiskBased ())
283- {
284- return finalize ();
285- }
286-
287- // if disk-based
288- // here there are <= inputQueuesNumber files on disk
289- // and potentially some in-memory state
290- // need to merge this together to produce a result
200+ // make a queue with rgdatas and hand it to DiskBasedTopNOrderBy
201+ auto dl = RowGroupDL (1 , 1 );
202+ auto & orderedRowsQueue = getQueue ();
203+ size_t rowsOverRG = orderedRowsQueue.size () % fRowsPerRG ;
204+ size_t numberOfRGs = orderedRowsQueue.size () / fRowsPerRG + static_cast <size_t >(rowsOverRG > 0 );
205+ std::thread flushThread (&DiskBasedTopNOrderBy::flushCurrentToDisk, this , std::ref (dl), fRowGroup ,
206+ numberOfRGs, firstFlush);
291207
292- if (fUncommitedMemory > 0 )
208+ uint32_t rSize = fRow0 .getSize ();
209+ // process leftovers
210+ if (rowsOverRG)
293211 {
294- if (!fRm ->getMemory (fUncommitedMemory , fSessionMemLimit ))
295- {
296- cerr << IDBErrorInfo::instance ()->errorMsg (fErrorCode ) << " @" << __FILE__ << " :" << __LINE__;
297- throw logging::OutOfMemoryExcept (fErrorCode );
298- }
299- fMemSize += fUncommitedMemory ;
300- fUncommitedMemory = 0 ;
301- }
302-
303- queue<RGData> tempQueue;
304- if (fRowGroup .getRowCount () > 0 )
305- fDataQueue .push (fData );
212+ fData .reinit (fRowGroup , rowsOverRG);
213+ fRowGroup .setData (&fData );
214+ fRowGroup .resetRowGroup (0 );
215+ fRowGroup .getRow (rowsOverRG-1 , &fRow0 );
216+
217+ const OrderByRow& topRow = orderedRowsQueue.top ();
218+ row1.setData (topRow.fData );
219+ copyRow (row1, &fRow0 );
220+ fRowGroup .incRowCount ();
221+ fRow0 .prevRow (rSize);
222+ orderedRowsQueue.pop ();
306223
307- auto & orderedRowsQueue = getQueue ();
224+ dl.insert (fData );
225+ }
308226
309227 if (orderedRowsQueue.size () > 0 )
310228 {
311- // *DRRTUY Very memory intensive. CS needs to account active
312- // memory only and release memory if needed.
313- uint64_t memSizeInc = fRowGroup .getSizeWithStrings () - fRowGroup .getHeaderSize ();
314-
315- if (!fRm ->getMemory (memSizeInc, fSessionMemLimit ))
316- {
317- cerr << IDBErrorInfo::instance ()->errorMsg (fErrorCode ) << " @" << __FILE__ << " :" << __LINE__;
318- throw logging::OutOfMemoryExcept (fErrorCode );
319- }
320- fMemSize += memSizeInc;
321-
322- uint64_t offset = 0 ;
323- uint64_t i = 0 ;
324- // Reduce queue size by an offset value if it applicable.
325- uint64_t queueSizeWoOffset = orderedRowsQueue.size () > fStart ? orderedRowsQueue.size () - fStart : 0 ;
326- list<RGData> tempRGDataList;
327-
328- if (fCount <= queueSizeWoOffset)
329- {
330- offset = fCount % fRowsPerRG ;
331- if (!offset && fCount > 0 )
332- offset = fRowsPerRG ;
333- }
334- else
335- {
336- offset = queueSizeWoOffset % fRowsPerRG ;
337- if (!offset && queueSizeWoOffset > 0 )
338- offset = fRowsPerRG ;
339- }
340-
341- list<RGData>::iterator tempListIter = tempRGDataList.begin ();
342-
343- i = 0 ;
344- uint32_t rSize = fRow0 .getSize ();
345- uint64_t preLastRowNumb = fRowsPerRG - 1 ;
346229 fData .reinit (fRowGroup , fRowsPerRG );
347230 fRowGroup .setData (&fData );
348231 fRowGroup .resetRowGroup (0 );
349- // *DRRTUY This approach won't work with
350- // OFSET > fRowsPerRG
351- offset = offset != 0 ? offset - 1 : offset;
352- fRowGroup .getRow (offset, &fRow0 );
232+ fRowGroup .getRow (fRowsPerRG -1 , &fRow0 );
353233
354- while (( orderedRowsQueue.size () > fStart ) && (i++ < fCount ))
234+ while (! orderedRowsQueue.empty ( ))
355235 {
356236 const OrderByRow& topRow = orderedRowsQueue.top ();
357237 row1.setData (topRow.fData );
358238 copyRow (row1, &fRow0 );
359239 fRowGroup .incRowCount ();
360- offset--;
361240 fRow0 .prevRow (rSize);
362241 orderedRowsQueue.pop ();
363242
364- // if RG has fRowsPerRG rows
365- if (offset == (uint64_t )-1 )
243+ if (fRowGroup .getRowCount () == fRowsPerRG )
366244 {
367- tempRGDataList.push_front (fData );
368-
369- if (!fRm ->getMemory (memSizeInc, fSessionMemLimit ))
370- {
371- cerr << IDBErrorInfo::instance ()->errorMsg (fErrorCode ) << " @" << __FILE__ << " :" << __LINE__;
372- throw logging::OutOfMemoryExcept (fErrorCode );
373- }
374- fMemSize += memSizeInc;
245+ dl.insert (fData );
375246
376247 fData .reinit (fRowGroup , fRowsPerRG );
377248 fRowGroup .setData (&fData );
378- fRowGroup .resetRowGroup (0 ); // ?
379- fRowGroup .getRow (preLastRowNumb, &fRow0 );
380- offset = preLastRowNumb;
249+ fRowGroup .resetRowGroup (0 );
250+ fRowGroup .getRow (fRowsPerRG -1 , &fRow0 );
381251 }
382252 }
383- // Push the last/only group into the queue.
253+
384254 if (fRowGroup .getRowCount () > 0 )
385- tempRGDataList.push_front (fData );
255+ dl.insert (fData );
256+ }
386257
387- for (tempListIter = tempRGDataList.begin (); tempListIter != tempRGDataList.end (); tempListIter++)
388- tempQueue.push (*tempListIter);
258+ dl.endOfInput ();
389259
390- fDataQueue = tempQueue;
260+ // clean up the current queue/rgdatas to free mem
261+ // fDataQueue
262+ // fDistinctMap
263+ // orderedRowsQueue
264+ queue<RGData> tempQueue;
265+ fDataQueue .swap (tempQueue);
266+ if (fDistinctMap )
267+ {
268+ fDistinctMap ->clear ();
391269 }
270+
271+ flushThread.join ();
272+ }
273+
274+ void LimitedOrderBy::brandNewFinalize ()
275+ {
276+ if (!isDiskBased ())
277+ {
278+ return finalize ();
279+ }
280+
281+ // if disk-based
282+ // here there are <= inputQueuesNumber files on disk
283+ // and potentially some in-memory state
284+ // need to merge this together to produce a result
392285}
393286
394287/*
0 commit comments