Skip to content

Commit 5102c24

Browse files
committed
feat(dumper): dumper interface improvements and some TNS and OB refactoring
1 parent ba728fa commit 5102c24

File tree

11 files changed

+90
-69
lines changed

11 files changed

+90
-69
lines changed

dbcon/joblist/disk-based-topnorderby.h

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,26 @@
2121
#include <string>
2222
#include <vector>
2323

24+
#include "dumper.h"
2425
#include "elementtype.h"
26+
#include "resourcemanager.h"
2527
namespace joblist
2628
{
2729

28-
class DiskBasedTopNOrderBy
30+
class DiskBasedTopNOrderBy : public rowgroup::RGDumper
2931
{
32+
// std::string fTmpDir =
33+
// config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
34+
// std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
3035
public:
31-
DiskBasedTopNOrderBy() {}
32-
~DiskBasedTopNOrderBy()
36+
// TODO Parametrize compression, tmpdir and memory manager (can be temp)
37+
DiskBasedTopNOrderBy(ResourceManager* rm)
38+
: RGDumper(compress::getCompressInterfaceByName("LZ4"), std::make_unique<rowgroup::MemManager>(),
39+
config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Sorting),
40+
"Sorting", reinterpret_cast<std::uintptr_t>(this))
3341
{
34-
// clean up left over files
3542
}
36-
43+
~DiskBasedTopNOrderBy() = default;
3744

3845
void incrementGenerationCounter()
3946
{
@@ -44,16 +51,30 @@ class DiskBasedTopNOrderBy
4451
return fGenerationCounter;
4552
}
4653

47-
bool isDiskBased() const { return fGenerationCounter > 0; }
54+
bool isDiskBased() const
55+
{
56+
return fGenerationCounter > 0;
57+
}
4858

49-
size_t getGenerationFilesNumber() const { return 0; }
50-
std::vector<std::string> getGenerationFileNamesNextBatch(const size_t batchSize) { return {}; }
59+
size_t getGenerationFilesNumber() const
60+
{
61+
return 0;
62+
}
63+
std::vector<std::string> getGenerationFileNamesNextBatch(const size_t batchSize)
64+
{
65+
return {};
66+
}
5167

52-
void flushCurrentToDisk(const bool firstFlush) { incrementGenerationCounter(); }
53-
void diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& dataLists) {}
68+
void flushCurrentToDisk(const bool firstFlush)
69+
{
70+
incrementGenerationCounter();
71+
}
72+
void diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& dataLists)
73+
{
74+
}
5475

55-
// private:
56-
uint64_t fGenerationCounter {0};
76+
// private:
77+
uint64_t fGenerationCounter{0};
5778
};
5879

59-
}
80+
} // namespace joblist

dbcon/joblist/jlf_tuplejoblist.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,6 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
499499
deliverySteps[CNX_VTABLE_ID] = ws;
500500
}
501501

502-
// TODO MCOL-894 we don't need to run sorting|distinct
503-
// every time
504-
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
505-
// (jobInfo.constantCol == CONST_COL_EXIST) ||
506-
// (jobInfo.hasDistinct))
507-
// {
508502
if (jobInfo.annexStep.get() == NULL)
509503
jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));
510504

@@ -513,20 +507,19 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
513507

514508
if (jobInfo.orderByColVec.size() > 0)
515509
{
516-
tas->addOrderBy(new LimitedOrderBy());
510+
tas->addOrderBy(jobInfo.rm);
517511
if (jobInfo.orderByThreads > 1)
518512
tas->setParallelOp();
519513
tas->setMaxThreads(jobInfo.orderByThreads);
520514
}
521515

516+
// TODO decouple TCS from TNS
522517
if (jobInfo.constantCol == CONST_COL_EXIST)
523518
tas->addConstant(new TupleConstantStep(jobInfo));
524519

525520
if (jobInfo.hasDistinct)
526521
tas->setDistinct();
527522

528-
// }
529-
530523
if (jobInfo.annexStep)
531524
{
532525
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());

dbcon/joblist/limitedorderby.cpp

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ namespace joblist
4040
const uint64_t LimitedOrderBy::fMaxUncommited = 102400; // 100KiB - make it configurable?
4141

4242
// LimitedOrderBy class implementation
43-
LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0)
43+
LimitedOrderBy::LimitedOrderBy(ResourceManager* rm)
44+
: DiskBasedTopNOrderBy(rm), fStart(0), fCount(-1), fUncommitedMemory(0)
4445
{
4546
fRule.fIdbCompare = this;
4647
}
@@ -109,13 +110,12 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
109110
if (fCount == 0)
110111
return;
111112

112-
std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl;
113-
std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
113+
std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl;
114+
std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
114115
auto& orderedRowsQueue = getQueue();
115116
// if the row count is less than the limit
116117
if (orderedRowsQueue.size() < fStart + fCount)
117118
{
118-
119119
copyRow(row, &fRow0);
120120
OrderByRow newRow(fRow0, fRule);
121121
orderedRowsQueue.push(newRow);
@@ -163,7 +163,7 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
163163
OrderByRow swapRow = orderedRowsQueue.top();
164164
row1.setData(swapRow.fData);
165165
std::cout << "LimitedOrderBy::processRow row2swap " << row1.toString() << std::endl;
166-
std::cout <<"LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl;
166+
std::cout << "LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl;
167167

168168
copyRow(row, &row1);
169169

@@ -178,7 +178,6 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
178178
}
179179
}
180180

181-
182181
void LimitedOrderBy::processRow_(const rowgroup::Row& row)
183182
{
184183
// check if this is a distinct row
@@ -189,13 +188,12 @@ void LimitedOrderBy::processRow_(const rowgroup::Row& row)
189188
if (fCount == 0)
190189
return;
191190

192-
193191
// TODO copy rules or replace ptrs to real instances in CompareRules
194192
// auto invertedRule = fRule;
195193
// invertedRule.revertRules();
196194

197-
std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl;
198-
std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
195+
std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl;
196+
std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl;
199197
auto& orderedRowsQueue = getQueue();
200198
// if the row count is less than the limit
201199
if (orderedRowsQueue.size() < fStart + fCount)
@@ -247,7 +245,7 @@ void LimitedOrderBy::processRow_(const rowgroup::Row& row)
247245
OrderByRow swapRow = orderedRowsQueue.top();
248246
row1.setData(swapRow.fData);
249247
std::cout << "LimitedOrderBy::processRow row2swap " << row1.toString() << std::endl;
250-
std::cout <<"LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl;
248+
std::cout << "LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl;
251249

252250
copyRow(row, &row1);
253251

@@ -268,8 +266,8 @@ void LimitedOrderBy::processRow_(const rowgroup::Row& row)
268266

269267
// // Skip OFFSET
270268
// uint64_t sqlOffset = fStart;
271-
// std::cout << "brandNewFinalize offset " << sqlOffset << " orderedRowsQueue.size() " << orderedRowsQueue.size() << std::endl;
272-
// while (sqlOffset > 0 && !orderedRowsQueue.empty())
269+
// std::cout << "brandNewFinalize offset " << sqlOffset << " orderedRowsQueue.size() " <<
270+
// orderedRowsQueue.size() << std::endl; while (sqlOffset > 0 && !orderedRowsQueue.empty())
273271
// {
274272
// auto r = orderedRowsQueue.top();
275273
// row1.setData(r.fData);
@@ -283,7 +281,7 @@ void LimitedOrderBy::brandNewFinalize()
283281
{
284282
if (!isDiskBased())
285283
{
286-
return finalize();
284+
return finalize();
287285
}
288286

289287
// if disk-based
@@ -391,11 +389,9 @@ void LimitedOrderBy::brandNewFinalize()
391389

392390
fDataQueue = tempQueue;
393391
}
394-
395-
396392
}
397393

398-
/*
394+
/*
399395
* The f() copies top element from an ordered queue into a row group. It
400396
* does this backwards to syncronise sorting orientation with the server.
401397
* The top row from the queue goes last into the returned set.
@@ -525,7 +521,8 @@ bool LimitedOrderBy::getNextRGData(RGData& data)
525521
// and the current sorted queue size.
526522
uint64_t rowsToRetrieve = std::min(fCount - fRowsReturned, fRowsPerRG);
527523
uint64_t rowsToRetrieveFromQueue = std::min(rowsToRetrieve, orderedRowsQueue.size());
528-
std::cout << "getNextRGData rowsToRetrieve " << rowsToRetrieve << " orderedRowsQueue.size() " << orderedRowsQueue.size() << std::endl;
524+
std::cout << "getNextRGData rowsToRetrieve " << rowsToRetrieve << " orderedRowsQueue.size() "
525+
<< orderedRowsQueue.size() << std::endl;
529526
std::cout << "getNextRGData rowsToRetrieveFromQueue " << rowsToRetrieveFromQueue << std::endl;
530527
for (; rowsToRetrieveFromQueue > thisRGRowNumber; ++thisRGRowNumber)
531528
{

dbcon/joblist/limitedorderby.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "disk-based-topnorderby.h"
2727
#include "../../utils/windowfunction/idborderby.h"
28+
#include "resourcemanager.h"
2829
#include "rowgroup.h"
2930

3031
namespace joblist
@@ -39,7 +40,7 @@ struct JobInfo;
3940
class LimitedOrderBy : public ordering::IdbOrderBy, public DiskBasedTopNOrderBy
4041
{
4142
public:
42-
LimitedOrderBy();
43+
LimitedOrderBy(ResourceManager* rm = nullptr); // TODO remove default
4344
~LimitedOrderBy() override;
4445
using ordering::IdbOrderBy::initialize;
4546
void initialize(const rowgroup::RowGroup&, const JobInfo&, bool invertRules = false,

dbcon/joblist/tupleannexstep.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo)
118118
, fEndOfResult(false)
119119
, fDistinct(false)
120120
, fParallelOp(false)
121-
, fOrderBy(NULL)
122-
, fConstant(NULL)
121+
, fConstant(nullptr)
123122
, fFeInstance(funcexp::FuncExp::instance())
124123
, fJobList(jobInfo.jobListPtr)
125124
, fFinishedThreads(0)
@@ -146,10 +145,10 @@ TupleAnnexStep::~TupleAnnexStep()
146145
fRunnersList.clear();
147146
}
148147

149-
if (fOrderBy)
150-
delete fOrderBy;
148+
// if (fOrderBy)
149+
// delete fOrderBy;
151150

152-
fOrderBy = NULL;
151+
// fOrderBy = NULL;
153152

154153
if (fConstant)
155154
delete fConstant;

dbcon/joblist/tupleannexstep.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep
7070

7171
void initialize(const rowgroup::RowGroup& rgIn, const JobInfo& jobInfo);
7272

73-
void addOrderBy(LimitedOrderBy* lob)
73+
void addOrderBy(ResourceManager* rm)
7474
{
75-
fOrderBy = lob;
75+
fOrderBy = std::make_unique<LimitedOrderBy>(rm);
7676
}
7777
void addConstant(TupleConstantStep* tcs)
7878
{
@@ -187,7 +187,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep
187187
bool fDistinct;
188188
bool fParallelOp;
189189

190-
LimitedOrderBy* fOrderBy;
190+
std::unique_ptr<LimitedOrderBy> fOrderBy{nullptr};
191191
TupleConstantStep* fConstant;
192192

193193
funcexp::FuncExp* fFeInstance;

utils/configcpp/configcpp.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ std::string Config::getTempFileDir(Config::TempDirPurpose what)
598598
{
599599
case TempDirPurpose::Joins: return prefix.append("joins/");
600600
case TempDirPurpose::Aggregates: return prefix.append("aggregates/");
601+
case TempDirPurpose::Sorting: return prefix.append("sorting/");
601602
}
602603
// NOTREACHED
603604
return {};

utils/configcpp/configcpp.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ class Config
206206
enum class TempDirPurpose
207207
{
208208
Joins, ///< disk joins
209-
Aggregates ///< disk-based aggregation
209+
Aggregates, ///< disk-based aggregation
210+
Sorting ///< disk-based sorting
210211
};
211212
/** @brief Return temporaru directory path for the specified purpose */
212213
std::string getTempFileDir(TempDirPurpose what);

utils/rowgroup/dumper.cpp

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,16 @@
2424

2525
namespace common
2626
{
27-
std::string errorString(int errNo)
28-
{
29-
char tmp[1024];
30-
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
31-
return {buf};
32-
}
27+
std::string errorString(int errNo)
28+
{
29+
char tmp[1024];
30+
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
31+
return {buf};
32+
}
3333
} // namespace common
3434

3535
namespace rowgroup
3636
{
37-
Dumper::Dumper(const compress::CompressInterface* comp, MemManager* mm) : fCompressor(comp), fMM(mm->clone())
38-
{
39-
}
40-
4137
int Dumper::write(const std::string& fname, const char* buf, size_t sz)
4238
{
4339
if (sz == 0)
@@ -161,8 +157,8 @@ void Dumper::checkBuffer(size_t len)
161157
std::string RGDumper::makeRGFilename(uint64_t rgid, const uint16_t generation) const
162158
{
163159
char buf[PATH_MAX];
164-
snprintf(buf, sizeof(buf), "%s/Sorting-p%u-t%ld-rg%lu-g%u", fTmpDir.c_str(), getpid(), fUniqId, rgid,
165-
generation);
160+
snprintf(buf, sizeof(buf), "%s/%s-p%u-t%ld-rg%lu-g%u", fOperationName.c_str(), fTmpDir.c_str(), getpid(),
161+
fUniqId, rgid, generation);
166162
return buf;
167163
}
168164

utils/rowgroup/dumper.h

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
namespace common
2727
{
28-
std::string errorString(int errNo);
28+
std::string errorString(int errNo);
2929
}
3030

3131
namespace rowgroup
@@ -34,8 +34,11 @@ namespace rowgroup
3434
class Dumper
3535
{
3636
public:
37-
Dumper(const compress::CompressInterface* comp, MemManager* mm);
38-
37+
Dumper(const compress::CompressInterface* comp, MemManager* mm)
38+
: fCompressor(comp), fMM(std::unique_ptr<MemManager>(mm)) {}
39+
Dumper(const compress::CompressInterface* comp, std::unique_ptr<MemManager>& mm)
40+
: fCompressor(comp), fMM(std::move(mm)) {}
41+
~Dumper() = default;
3942
int write(const std::string& fname, const char* buf, size_t sz);
4043
int read(const std::string& fname, std::vector<char>& buf);
4144
size_t size() const;
@@ -49,17 +52,25 @@ class Dumper
4952
std::vector<char> fTmpBuf;
5053
};
5154

52-
class RGDumper: protected Dumper
55+
class RGDumper : protected Dumper
5356
{
5457
public:
55-
RGDumper(const compress::CompressInterface* comp, MemManager* mm, const std::string& tmpDir, const uint64_t uniqId) : Dumper(comp, mm), fTmpDir(tmpDir) { }
56-
void loadRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false);
58+
RGDumper(const compress::CompressInterface* comp, std::unique_ptr<MemManager> mm, const std::string& tmpDir,
59+
const std::string& operationName, const uint64_t uniqId)
60+
: Dumper(comp, mm), fTmpDir(tmpDir), fOperationName(operationName)
61+
{
62+
}
63+
~RGDumper() = default;
64+
void loadRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut,
65+
std::unique_ptr<RGData>& rgdata, bool unlinkDump = false);
5766
void saveRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, RGData* rgdata);
5867

5968
std::string makeRGFilename(uint64_t rgid, const uint16_t generation) const;
60-
private:
69+
70+
private:
6171
std::string fTmpDir;
72+
std::string fOperationName;
6273
uint64_t fUniqId;
6374
};
6475

65-
}
76+
} // namespace rowgroup

0 commit comments

Comments
 (0)