diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index 3783ccf14e..f0f5449df3 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -27,6 +27,7 @@ set(joblist_LIB_SRCS jobstepassociation.cpp lbidlist.cpp limitedorderby.cpp + disk-based-topnorderby.cpp passthrucommand-jl.cpp passthrustep.cpp pcolscan.cpp diff --git a/dbcon/joblist/disk-based-topnorderby.cpp b/dbcon/joblist/disk-based-topnorderby.cpp new file mode 100644 index 0000000000..9a621eb239 --- /dev/null +++ b/dbcon/joblist/disk-based-topnorderby.cpp @@ -0,0 +1,72 @@ +/* Copyright (C) 2025 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include + +#include "dumper.h" +#include "disk-based-topnorderby.h" +namespace joblist +{ + +// The caller ensures lifetime of dl and rg +void DiskBasedTopNOrderBy::flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush) +{ + size_t rgid = (firstFlush) ? numberOfRGs : 0; + rowgroup::RGData rgData; + + size_t generation = (firstFlush) ? getGenerationCounter() : 0; // WIP + + bool more = dl.next(0, &rgData); + while (more) + { + saveRG(rgid, generation, rg, &rgData); + more = dl.next(0, &rgData); + rgid = (firstFlush) ? rgid - 1 : rgid + 1; + } + + if (firstFlush) + { + incrementGenerationCounter(); + } + else + { + + } +} +void DiskBasedTopNOrderBy::diskBasedMergePhaseIfNeeded(std::vector& /*dataLists*/) +{ +} + +std::vector DiskBasedTopNOrderBy::getGenerationFileNamesNextBatch(const size_t batchSize) +{ + // assert(getGenerationFilesNumber() > batchSize); + auto totalNumberOfFilesYetToMerge = getGenerationFilesNumber() - batchSize; + auto batchSizeOrFilesLeftNumber = std::max(getGenerationFilesNumber(), batchSize); + auto actualBatchSize = std::min(totalNumberOfFilesYetToMerge, batchSizeOrFilesLeftNumber); + // add state for the starting offset + wraparound + size_t startOffset = 0; + std::vector res; + res.reserve(actualBatchSize); + for (size_t i = 0; i < startOffset + actualBatchSize; ++i) + { + res.push_back(makeRGFilePrefix(i)); + } + + return res; + + +} // namespace joblist \ No newline at end of file diff --git a/dbcon/joblist/disk-based-topnorderby.h b/dbcon/joblist/disk-based-topnorderby.h new file mode 100644 index 0000000000..d490a94da8 --- /dev/null +++ b/dbcon/joblist/disk-based-topnorderby.h @@ -0,0 +1,77 @@ +/* Copyright (C) 2025 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include + +#include "dumper.h" +#include "elementtype.h" +#include "resourcemanager.h" +namespace joblist +{ + +class DiskBasedTopNOrderBy : public rowgroup::RGDumper +{ + // std::string fTmpDir = + // config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates); + // std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression"); + public: + // TODO Parametrize compression, tmpdir and memory manager (can be temp) + DiskBasedTopNOrderBy(ResourceManager* /*rm*/) + : RGDumper(compress::getCompressInterfaceByName("LZ4"), std::make_unique(), + config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Sorting), + "Sorting", reinterpret_cast(this)) + { + } + ~DiskBasedTopNOrderBy() = default; + + void incrementGenerationCounter() + { + ++fGenerationCounter; + uint64_t newGeneration = (fGenerations.empty()) ? 1 : fGenerations.back() + 1; + fGenerations.push(newGeneration); + } + uint64_t getGenerationCounter() const + { + return (fGenerations.empty()) ? 0 : fGenerations.back(); + } + + bool isDiskBased() const + { + return fGenerationCounter > 0; + } + + size_t getGenerationFilesNumber() const + { + return 0; + } + std::vector getGenerationFileNamesNextBatch(const size_t batchSize); + + // The caller ensures lifetime of dl and rg + void flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush); + void diskBasedMergePhaseIfNeeded(std::vector& dataLists); + + // private: + uint64_t fGenerationCounter{0}; + std::queue fGenerations; +}; + +} // namespace joblist \ No newline at end of file diff --git a/dbcon/joblist/elementtype.h b/dbcon/joblist/elementtype.h index 7987498a74..8b8db7b5f7 100644 --- a/dbcon/joblist/elementtype.h +++ b/dbcon/joblist/elementtype.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016-2025 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -238,16 +239,7 @@ extern std::istream& operator>>(std::istream& in, TupleType& rhs); extern std::ostream& operator<<(std::ostream& out, const TupleType& rhs); } // namespace joblist -#ifndef NO_DATALISTS - -// #include "bandeddl.h" -// #include "wsdl.h" #include "fifo.h" -// #include "bucketdl.h" -// #include "constantdatalist.h" -// #include "swsdl.h" -// #include "zdl.h" -// #include "deliverywsdl.h" namespace joblist { @@ -327,7 +319,8 @@ typedef DataList StrDataList; // */ // typedef BucketDL TupleBucketDataList; -typedef FIFO RowGroupDL; +using RowGroupDL = FIFO; +using RowGroupDLSPtr = std::shared_ptr; } // namespace joblist @@ -425,5 +418,3 @@ extern std::ostream& showOidInDL(std::ostream& strm); extern std::ostream& omitOidInDL(std::ostream& strm); } // namespace joblist - -#endif diff --git a/dbcon/joblist/jlf_tuplejoblist.cpp b/dbcon/joblist/jlf_tuplejoblist.cpp index 96add786bb..bfe36dac19 100644 --- a/dbcon/joblist/jlf_tuplejoblist.cpp +++ b/dbcon/joblist/jlf_tuplejoblist.cpp @@ -499,12 +499,6 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, deliverySteps[CNX_VTABLE_ID] = ws; } - // TODO MCOL-894 we don't need to run sorting|distinct - // every time - // if ((jobInfo.limitCount != (uint64_t) - 1) || - // (jobInfo.constantCol == CONST_COL_EXIST) || - // (jobInfo.hasDistinct)) - // { if (jobInfo.annexStep.get() == NULL) jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo)); @@ -513,20 +507,19 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, if (jobInfo.orderByColVec.size() > 0) { - tas->addOrderBy(new LimitedOrderBy()); + tas->addOrderBy(jobInfo.rm); if (jobInfo.orderByThreads > 1) tas->setParallelOp(); tas->setMaxThreads(jobInfo.orderByThreads); } + // TODO decouple TCS from TNS if (jobInfo.constantCol == CONST_COL_EXIST) tas->addConstant(new TupleConstantStep(jobInfo)); if (jobInfo.hasDistinct) tas->setDistinct(); - // } - if (jobInfo.annexStep) { TupleDeliveryStep* ds = dynamic_cast(deliverySteps[CNX_VTABLE_ID].get()); diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index f22b2adab3..a417570aad 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -40,7 +40,8 @@ namespace joblist const uint64_t LimitedOrderBy::fMaxUncommited = 102400; // 100KiB - make it configurable? // LimitedOrderBy class implementation -LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0) +LimitedOrderBy::LimitedOrderBy(ResourceManager* rm) + : DiskBasedTopNOrderBy(rm), fStart(0), fCount(-1), fUncommitedMemory(0) { fRule.fIdbCompare = this; } @@ -109,13 +110,15 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) if (fCount == 0) return; - auto& orderByQueue = getQueue(); + // std::cout << "LimitedOrderBy::processRow row " << row.toString() << std::endl; + // std::cout << "LimitedOrderBy::processRow fStart " << fStart << " fCount " << fCount << std::endl; + auto& orderedRowsQueue = getQueue(); // if the row count is less than the limit - if (orderByQueue.size() < fStart + fCount) + if (orderedRowsQueue.size() < fStart + fCount) { copyRow(row, &fRow0); OrderByRow newRow(fRow0, fRule); - orderByQueue.push(newRow); + orderedRowsQueue.push(newRow); uint64_t memSizeInc = sizeof(newRow); fUncommitedMemory += memSizeInc; @@ -155,22 +158,142 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) fRowGroup.getRow(0, &fRow0); } } - - else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderedRowsQueue.top().fData)) { - OrderByRow swapRow = orderByQueue.top(); + OrderByRow swapRow = orderedRowsQueue.top(); row1.setData(swapRow.fData); + // std::cout << "LimitedOrderBy::processRow row2swap " << row1.toString() << std::endl; + // std::cout << "LimitedOrderBy::processRow new row 4 swaping " << row.toString() << std::endl; + copyRow(row, &row1); if (fDistinct) { - fDistinctMap->erase(orderByQueue.top().fData); + fDistinctMap->erase(orderedRowsQueue.top().fData); fDistinctMap->insert(row1.getPointer()); } - orderByQueue.pop(); - orderByQueue.push(swapRow); + orderedRowsQueue.pop(); + orderedRowsQueue.push(swapRow); + } +} + +// void LimitedOrderBy::brandNewFinalize() +// { +// auto& orderedRowsQueue = getQueue(); + +// // Skip OFFSET +// uint64_t sqlOffset = fStart; +// std::cout << "brandNewFinalize offset " << sqlOffset << " orderedRowsQueue.size() " << +// orderedRowsQueue.size() << std::endl; while (sqlOffset > 0 && !orderedRowsQueue.empty()) +// { +// auto r = orderedRowsQueue.top(); +// row1.setData(r.fData); +// std::cout << "brandNewFinalize row " << row1.toString() << std::endl; +// orderedRowsQueue.pop(); +// --sqlOffset; +// } +// } + +void LimitedOrderBy::flushCurrentToDisk_(const bool firstFlush) +{ + // make a queue with rgdatas and hand it to DiskBasedTopNOrderBy + auto dl = RowGroupDL(1, 1); + auto& orderedRowsQueue = getQueue(); + size_t rowsOverRG = orderedRowsQueue.size() % fRowsPerRG; + size_t numberOfRGs = orderedRowsQueue.size() / fRowsPerRG + static_cast(rowsOverRG > 0); + std::thread flushThread(&DiskBasedTopNOrderBy::flushCurrentToDisk, this, std::ref(dl), fRowGroup, + numberOfRGs, firstFlush); + + uint32_t rSize = fRow0.getSize(); + // process leftovers + if (rowsOverRG) + { + fData.reinit(fRowGroup, rowsOverRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(rowsOverRG-1, &fRow0); + + const OrderByRow& topRow = orderedRowsQueue.top(); + row1.setData(topRow.fData); + copyRow(row1, &fRow0); + fRowGroup.incRowCount(); + fRow0.prevRow(rSize); + orderedRowsQueue.pop(); + + dl.insert(fData); + } + + if (orderedRowsQueue.size() > 0) + { + fData.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(fRowsPerRG-1, &fRow0); + + while (!orderedRowsQueue.empty()) + { + const OrderByRow& topRow = orderedRowsQueue.top(); + row1.setData(topRow.fData); + copyRow(row1, &fRow0); + fRowGroup.incRowCount(); + fRow0.prevRow(rSize); + orderedRowsQueue.pop(); + + if (fRowGroup.getRowCount() == fRowsPerRG) + { + dl.insert(fData); + + fData.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(fRowsPerRG-1, &fRow0); + } + } + + if (fRowGroup.getRowCount() > 0) + dl.insert(fData); } + + dl.endOfInput(); + + // clean up the current queue/rgdatas to free mem + // fDataQueue + // fDistinctMap + // orderedRowsQueue + queue tempQueue; + // std::cout << "flush num of refs before deletion " << fDataQueue.front().rowData.use_count() << std::endl; + // { + // auto frontOfQueue = fDataQueue.front(); + // fDataQueue.pop(); + // std::cout << "flush num of refs w/o the queue " << frontOfQueue.rowData.use_count() << std::endl; + // std::cout << "flush num of refs swaping the queue with empty " << frontOfQueue.rowData.use_count() << " free " << fMemSize << std::endl; + // } + + fDataQueue.swap(tempQueue); + + if (fDistinctMap) + { + fDistinctMap->clear(); + } + + fRm->returnMemory(fMemSize); + fMemSize = 0; + + flushThread.join(); +} + +void LimitedOrderBy::brandNewFinalize() +{ + if (!isDiskBased()) + { + return finalize(); + } + + // if disk-based + // here there are <= inputQueuesNumber files on disk + // and potentially some in-memory state + // need to merge this together to produce a result } /* @@ -185,7 +308,7 @@ void LimitedOrderBy::finalize() if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; - throw IDBExcept(fErrorCode); + throw logging::OutOfMemoryExcept(fErrorCode); } fMemSize += fUncommitedMemory; fUncommitedMemory = 0; @@ -195,9 +318,9 @@ void LimitedOrderBy::finalize() if (fRowGroup.getRowCount() > 0) fDataQueue.push(fData); - auto& orderByQueue = getQueue(); + auto& orderedRowsQueue = getQueue(); - if (orderByQueue.size() > 0) + if (orderedRowsQueue.size() > 0) { // *DRRTUY Very memory intensive. CS needs to account active // memory only and release memory if needed. @@ -206,14 +329,14 @@ void LimitedOrderBy::finalize() if (!fRm->getMemory(memSizeInc, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; - throw IDBExcept(fErrorCode); + throw logging::OutOfMemoryExcept(fErrorCode); } fMemSize += memSizeInc; uint64_t offset = 0; uint64_t i = 0; // Reduce queue size by an offset value if it applicable. - uint64_t queueSizeWoOffset = orderByQueue.size() > fStart ? orderByQueue.size() - fStart : 0; + uint64_t queueSizeWoOffset = orderedRowsQueue.size() > fStart ? orderedRowsQueue.size() - fStart : 0; list tempRGDataList; if (fCount <= queueSizeWoOffset) @@ -242,15 +365,15 @@ void LimitedOrderBy::finalize() offset = offset != 0 ? offset - 1 : offset; fRowGroup.getRow(offset, &fRow0); - while ((orderByQueue.size() > fStart) && (i++ < fCount)) + while ((orderedRowsQueue.size() > fStart) && (i++ < fCount)) { - const OrderByRow& topRow = orderByQueue.top(); + const OrderByRow& topRow = orderedRowsQueue.top(); row1.setData(topRow.fData); copyRow(row1, &fRow0); fRowGroup.incRowCount(); offset--; fRow0.prevRow(rSize); - orderByQueue.pop(); + orderedRowsQueue.pop(); // if RG has fRowsPerRG rows if (offset == (uint64_t)-1) @@ -260,7 +383,7 @@ void LimitedOrderBy::finalize() if (!fRm->getMemory(memSizeInc, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; - throw IDBExcept(fErrorCode); + throw logging::OutOfMemoryExcept(fErrorCode); } fMemSize += memSizeInc; @@ -282,6 +405,46 @@ void LimitedOrderBy::finalize() } } +// WIP UNUSED +bool LimitedOrderBy::getNextRGData(RGData& data) +{ + auto& orderedRowsQueue = getQueue(); + + if (orderedRowsQueue.empty()) + { + return false; + } + + uint32_t rSize = fRow0.getSize(); + data.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&data); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(0, &fRow0); + + uint64_t thisRGRowNumber = 0; + // find number of rows to retrieve from the queue using SQL LIMIT + // and the current sorted queue size. + uint64_t rowsToRetrieve = std::min(fCount - fRowsReturned, fRowsPerRG); + uint64_t rowsToRetrieveFromQueue = std::min(rowsToRetrieve, orderedRowsQueue.size()); + std::cout << "getNextRGData rowsToRetrieve " << rowsToRetrieve << " orderedRowsQueue.size() " + << orderedRowsQueue.size() << std::endl; + std::cout << "getNextRGData rowsToRetrieveFromQueue " << rowsToRetrieveFromQueue << std::endl; + for (; rowsToRetrieveFromQueue > thisRGRowNumber; ++thisRGRowNumber) + { + const OrderByRow& topRow = orderedRowsQueue.top(); + row1.setData(topRow.fData); + std::cout << "getNextRGData row " << row1.toString() << std::endl; + copyRow(row1, &fRow0); + fRowGroup.incRowCount(); + fRow0.nextRow(rSize); + orderedRowsQueue.pop(); + } + + fRowsReturned += rowsToRetrieveFromQueue; + + return rowsToRetrieveFromQueue > 0; +} + const string LimitedOrderBy::toString() const { ostringstream oss; diff --git a/dbcon/joblist/limitedorderby.h b/dbcon/joblist/limitedorderby.h index ff0daafd22..024bfe6a62 100644 --- a/dbcon/joblist/limitedorderby.h +++ b/dbcon/joblist/limitedorderby.h @@ -22,8 +22,11 @@ #pragma once #include -#include "rowgroup.h" + +#include "disk-based-topnorderby.h" #include "../../utils/windowfunction/idborderby.h" +#include "resourcemanager.h" +#include "rowgroup.h" namespace joblist { @@ -34,15 +37,16 @@ struct JobInfo; // This version is for subqueries, limit the result set to fit in memory, // use ORDER BY to make the results consistent. // The actual output are the first or last # of rows, which are NOT ordered. -class LimitedOrderBy : public ordering::IdbOrderBy +class LimitedOrderBy : public ordering::IdbOrderBy, public DiskBasedTopNOrderBy { public: - LimitedOrderBy(); + LimitedOrderBy(ResourceManager* rm = nullptr); // TODO remove default ~LimitedOrderBy() override; using ordering::IdbOrderBy::initialize; void initialize(const rowgroup::RowGroup&, const JobInfo&, bool invertRules = false, bool isMultiThreded = false); void processRow(const rowgroup::Row&) override; + void processRow_(const rowgroup::Row&); uint64_t getKeyLength() const override; uint64_t getLimitCount() const { @@ -50,13 +54,20 @@ class LimitedOrderBy : public ordering::IdbOrderBy } const std::string toString() const override; + void flushCurrentToDisk_(const bool firstFlush); + void finalize(); + void brandNewFinalize(); + bool getNextRGData(rowgroup::RGData& data); + protected: uint64_t fStart; uint64_t fCount; uint64_t fUncommitedMemory; static const uint64_t fMaxUncommited; + uint64_t fOffsetInOrderedRowsQueue; + uint64_t fRowsReturned{0}; }; } // namespace joblist diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 3345194181..563fd3d049 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -46,7 +46,6 @@ using namespace execplan; using namespace rowgroup; #include "hasher.h" -#include "stlpoolallocator.h" #include "threadnaming.h" using namespace utils; @@ -119,8 +118,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) , fEndOfResult(false) , fDistinct(false) , fParallelOp(false) - , fOrderBy(NULL) - , fConstant(NULL) + , fConstant(nullptr) , fFeInstance(funcexp::FuncExp::instance()) , fJobList(jobInfo.jobListPtr) , fFinishedThreads(0) @@ -147,10 +145,10 @@ TupleAnnexStep::~TupleAnnexStep() fRunnersList.clear(); } - if (fOrderBy) - delete fOrderBy; + // if (fOrderBy) + // delete fOrderBy; - fOrderBy = NULL; + // fOrderBy = NULL; if (fConstant) delete fConstant; @@ -187,7 +185,7 @@ void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo) if (fOrderBy) { fOrderBy->distinct(fDistinct); - fOrderBy->initialize(rgIn, jobInfo); + fOrderBy->initialize(rgIn, jobInfo, false, false); } } @@ -574,12 +572,12 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() dataVec.pop_back(); } } - catch (const std::bad_alloc&) + catch (const logging::OutOfMemoryExcept&) { auto errorCode = ERR_TNS_DISTINCT_IS_TOO_BIG; auto newException = IDBExcept(errorCode); - handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::executeNoOrderByWithDistinct()"); + handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, + logging::ERR_ALWAYS_CRITICAL, "TupleAnnexStep::executeNoOrderByWithDistinct()"); } catch (...) { @@ -605,95 +603,185 @@ void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& row } } +std::vector TupleAnnexStep::createInputDLs(const size_t dLsCount) const +{ + std::vector result; + for (size_t i = 0; i < dLsCount; ++i) + { + result.emplace_back(new RowGroupDL(1, 1)); // WIP hardcode + } + return result; +} + +std::vector TupleAnnexStep::startReaders(std::vector& dataLists, std::vector& fileNames) +{ + //assert(dataLists.size(), fileNames.size()) + std::vector result(dataLists.size()); + for (size_t i = 0; i < dataLists.size(); ++i) + { + result[i] = jobstepThreadPool.invoke( + [&dataLists, i]() + { + // open file + // loop + // read ByteStream + // make BS into RGData + // put into dataLists[i] + // close file + // emit empty RGData + }); + } + return result; +} + void TupleAnnexStep::executeWithOrderBy() { utils::setThreadName("TNSwOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; + bool flushToDisk = false; try { - more = fInputDL->next(fInputIterator, &rgDataIn); - - if (traceOn()) - dlTimes.setFirstReadTime(); - - StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_START, 1); - postStepStartTele(sts); - - while (more && !cancelled()) + for (;;) { - fRowGroupIn.setData(&rgDataIn); - fRowGroupIn.getRow(0, &fRowIn); - - for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled(); ++i) + try { - fOrderBy->processRow(fRowIn); - fRowIn.nextRow(); - } + more = fInputDL->next(fInputIterator, &rgDataIn); - more = fInputDL->next(fInputIterator, &rgDataIn); - } + if (traceOn()) + dlTimes.setFirstReadTime(); - fOrderBy->finalize(); + StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_START, 1); + postStepStartTele(sts); - if (!cancelled()) - { - while (fOrderBy->getData(rgDataIn)) - { - if (fConstant == NULL && fRowGroupOut.getColumnCount() == fRowGroupIn.getColumnCount()) - { - rgDataOut = rgDataIn; - fRowGroupOut.setData(&rgDataOut); - } - else + while (more && !cancelled()) { fRowGroupIn.setData(&rgDataIn); fRowGroupIn.getRow(0, &fRowIn); - rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); - fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot()); - fRowGroupOut.getRow(0, &fRowOut); - - for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) + for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled(); ++i) { - if (fConstant) - fConstant->fillInConstants(fRowIn, fRowOut); - else - copyRow(fRowIn, &fRowOut); - - fRowGroupOut.incRowCount(); - fRowOut.nextRow(); + fOrderBy->processRow(fRowIn); fRowIn.nextRow(); } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - // release RGData memory - size_t rgDataSize = fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize(); - fOrderBy->returnRGDataMemory2RM(rgDataSize); + // std::cout << "use_count " << rgDataIn.rowData.use_count() << " rgDataIn.rowData " << std::hex + // << (uint64_t)rgDataIn.rowData.get() << std::dec << std::endl; + + more = fInputDL->next(fInputIterator, &rgDataIn); + + // if (more) + // { + // std::cout << "use_count " << rgDataIn.rowData.use_count() << " rgDataIn.rowData " << std::hex + // << (uint64_t)rgDataIn.rowData.get() << std::dec << std::endl; + // } + // else + // { + // std::cout << "use_count " << rgDataIn.rowData.use_count() << std::endl; + // } + // fOrderBy->fDataQueue.pop(); + // std::cout << "use_count " << rgDataIn.rowData.use_count() << std::endl; } } + catch (const logging::OutOfMemoryExcept&) + { + flushToDisk = true; + } + catch (...) + { + handleException(std::current_exception(), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, + "TupleAnnexStep::executeWithOrderBy()"); + } + + if (flushToDisk) + { + bool firstFlush = true; + std::cout << "disk-based flush" << std::endl; + fOrderBy->flushCurrentToDisk_(firstFlush); + flushToDisk = false; + } + else + { + break; + } } } - catch (const std::bad_alloc&) + catch (const logging::OutOfMemoryExcept&) { auto errorCode = fOrderBy->getErrorCode(); - auto newException = IDBExcept(errorCode); - handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::executeWithOrderBy()"); + auto newException = OutOfMemoryExcept(errorCode); + handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, + logging::ERR_ALWAYS_CRITICAL, "TupleAnnexStep::executeWithOrderBy()"); } - catch (...) + + // can be disk-based with no or few files and some in-memory state + + // store avg RGData size + if (fOrderBy->isDiskBased()) { - handleException(std::current_exception(), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::executeWithOrderBy()"); + std::cout << "disk-based is triggered" << std::endl; + // assess RAM available, avg RGData size statistics and free enough memory + // return memory if needed + size_t inputQueuesNumber = 2; + while (inputQueuesNumber < fOrderBy->getGenerationFilesNumber()) + { + auto fileNames = fOrderBy->getGenerationFileNamesNextBatch(inputQueuesNumber); + auto inputDLs = createInputDLs(fileNames.size()); + auto readers = startReaders(inputDLs, fileNames); + // create outputDLs or simplier atomic queues + readers threads + fOrderBy->diskBasedMergePhaseIfNeeded(inputDLs); + jobstepThreadPool.join(readers); + } + } + + fOrderBy->brandNewFinalize(); + + if (!cancelled()) + { + while (fOrderBy->getData(rgDataIn)) + { + if (fConstant == NULL && fRowGroupOut.getColumnCount() == fRowGroupIn.getColumnCount()) + { + rgDataOut = rgDataIn; + fRowGroupOut.setData(&rgDataOut); + } + else // TODO push this into finalize to populate next RGData rows + { + fRowGroupIn.setData(&rgDataIn); + fRowGroupIn.getRow(0, &fRowIn); + + rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); + fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot()); + fRowGroupOut.getRow(0, &fRowOut); + + for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) + { + if (fConstant) + fConstant->fillInConstants(fRowIn, fRowOut); + else + copyRow(fRowIn, &fRowOut); + + fRowGroupOut.incRowCount(); + fRowOut.nextRow(); + fRowIn.nextRow(); + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + + // release RGData memory + // TODO add some batching here to reduce atomic overhead. + // size_t rgDataSize = fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize(); + // fOrderBy->returnRGDataMemory2RM(rgDataSize); + } + } } while (more) @@ -701,6 +789,8 @@ void TupleAnnexStep::executeWithOrderBy() // Bug 3136, let mini stats to be formatted if traceOn. fOutputDL->endOfInput(); + + // TODO clean existing leftover disk-based files. } /* @@ -729,9 +819,10 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() // Calculate offset here fRowGroupOut.getRow(0, &fRowOut); - ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, fRm->getAllocator()); + auto allocSorting = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, allocSorting); std::unique_ptr distinctMap( - new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator(fRm))); + new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator(fRm))); fRowGroupIn.initRow(&row1); fRowGroupIn.initRow(&row2); @@ -765,8 +856,8 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() { auto errorCode = fOrderBy->getErrorCode(); auto newException = IDBExcept(errorCode); - handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::finalizeParallelOrderByDistinct()"); + handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, + logging::ERR_ALWAYS_CRITICAL, "TupleAnnexStep::finalizeParallelOrderByDistinct()"); } catch (...) { @@ -964,8 +1055,8 @@ void TupleAnnexStep::finalizeParallelOrderBy() { auto errorCode = fOrderBy->getErrorCode(); auto newException = IDBExcept(errorCode); - handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::finalizeParallelOrderBy()"); + handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, + logging::ERR_ALWAYS_CRITICAL, "TupleAnnexStep::finalizeParallelOrderBy()"); } catch (...) { @@ -1179,8 +1270,8 @@ void TupleAnnexStep::executeParallelOrderBy(uint64_t id) { auto errorCode = fOrderBy->getErrorCode(); auto newException = IDBExcept(errorCode); - handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, logging::ERR_ALWAYS_CRITICAL, - "TupleAnnexStep::executeParallelOrderBy()"); + handleException(std::make_exception_ptr(newException), logging::ERR_IN_PROCESS, + logging::ERR_ALWAYS_CRITICAL, "TupleAnnexStep::executeParallelOrderBy()"); } catch (...) { diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index f259476861..252001800f 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -20,8 +20,9 @@ #pragma once -#include #include +#include +#include #include "jobstep.h" #include "limitedorderby.h" @@ -69,9 +70,9 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep void initialize(const rowgroup::RowGroup& rgIn, const JobInfo& jobInfo); - void addOrderBy(LimitedOrderBy* lob) + void addOrderBy(ResourceManager* rm) { - fOrderBy = lob; + fOrderBy = std::make_unique(rm); } void addConstant(TupleConstantStep* tcs) { @@ -115,6 +116,30 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep void finalizeParallelOrderBy(); void finalizeParallelOrderByDistinct(); + // void enableFlushToDisk() + // { + // fFlushToDisk.store(true, std::memory_order_relaxed); + // } + // void disableFlushToDisk() + // { + // fFlushToDisk.store(false, std::memory_order_relaxed); + // } + // bool isFlushToDiskEnabled() const + // { + // return fFlushToDisk.load(std::memory_order_relaxed); + // } + // void incrementGenerationCounter() + // { + // ++fGenerationCounter; + // } + // uint64_t getGenerationCounter() const + // { + // return fGenerationCounter; + // } + + std::vector createInputDLs(const size_t dLsCount) const; + std::vector startReaders(std::vector& inputDLs, std::vector& fileNames); + // input/output rowgroup and row rowgroup::RowGroup fRowGroupIn; rowgroup::RowGroup fRowGroupOut; @@ -162,7 +187,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep bool fDistinct; bool fParallelOp; - LimitedOrderBy* fOrderBy; + std::unique_ptr fOrderBy{nullptr}; TupleConstantStep* fConstant; funcexp::FuncExp* fFeInstance; @@ -173,6 +198,8 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep uint16_t fFinishedThreads; boost::mutex fParallelFinalizeMutex; joblist::ResourceManager* fRm; + // std::atomic fFlushToDisk {false}; + // uint64_t fGenerationCounter {0}; }; } // namespace joblist diff --git a/dbcon/mysql/columnstore.cnf b/dbcon/mysql/columnstore.cnf index e0b77f0b24..08a035121a 100644 --- a/dbcon/mysql/columnstore.cnf +++ b/dbcon/mysql/columnstore.cnf @@ -1,6 +1,6 @@ [mariadb-client] quick -quick-max-column-width=0 +loose-quick-max-column-width=0 [mysqld] plugin-load-add=ha_columnstore.so diff --git a/primitives/primproc/serviceexemgr.cpp b/primitives/primproc/serviceexemgr.cpp index 2de371f21d..d04a1b0c55 100644 --- a/primitives/primproc/serviceexemgr.cpp +++ b/primitives/primproc/serviceexemgr.cpp @@ -130,7 +130,8 @@ void cleanTempDir() TempDirPurpose purpose; }; std::vector dirs{{"HashJoin", "AllowDiskBasedJoin", TempDirPurpose::Joins}, - {"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}}; + {"RowAggregation", "AllowDiskBasedAggregation", TempDirPurpose::Aggregates}, + {"Sorting", "AllowDiskBasedSorting", TempDirPurpose::Sorting}}; const auto config = config::Config::makeConfig(); for (const auto& dir : dirs) diff --git a/tests/minmaxheap.cpp b/tests/minmaxheap.cpp new file mode 100644 index 0000000000..9a4f26b210 --- /dev/null +++ b/tests/minmaxheap.cpp @@ -0,0 +1,151 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include "utils/structs/minmaxheap.h" // header that contains template class MinMaxHeap + +using IntHeap = MinMaxHeap; // default std::less comparator +using IntInvHeap = MinMaxHeap>; // inverted ordering + +// ---------- Basic behaviour ------------------------------------------------- + +TEST(MinMaxHeap, InitiallyEmpty) { + IntHeap h; + EXPECT_TRUE(h.empty()); + EXPECT_EQ(h.size(), 0); + EXPECT_THROW(h.getMin(), std::out_of_range); + EXPECT_THROW(h.getMax(), std::out_of_range); + EXPECT_THROW(h.popMin(), std::out_of_range); + EXPECT_THROW(h.popMax(), std::out_of_range); +} + +TEST(MinMaxHeap, SingleElement) { + IntHeap h; + h.push(42); + EXPECT_FALSE(h.empty()); + EXPECT_EQ(h.size(), 1); + EXPECT_EQ(h.getMin(), 42); + EXPECT_EQ(h.getMax(), 42); + h.popMin(); + EXPECT_TRUE(h.empty()); +} + +TEST(MinMaxHeap, OrderAfterInsertions) { + IntHeap h; + h.push(10); + h.push(5); + h.push(20); + EXPECT_EQ(h.getMin(), 5); + EXPECT_EQ(h.getMax(), 20); +} + +TEST(MinMaxHeap, PopMinAsc) { + IntHeap h; + for (int v : {7, 1, 4, 9, 3}) h.push(v); + + std::vector asc; + while (!h.empty()) { + asc.push_back(h.getMin()); + h.popMin(); + } + EXPECT_EQ(asc, (std::vector{1, 3, 4, 7, 9})); +} + +TEST(MinMaxHeap, PopMaxDesc) { + IntHeap h; + for (int v : {7, 1, 4, 9, 3}) h.push(v); + + std::vector desc; + while (!h.empty()) { + desc.push_back(h.getMax()); + h.popMax(); + } + EXPECT_EQ(desc, (std::vector{9, 7, 4, 3, 1})); +} + +TEST(MinMaxHeap, MixedPops) { + IntHeap h; + for (int i = 1; i <= 5; ++i) h.push(i); // heap contains 1..5 + + // min(1) → {2,3,4,5} + EXPECT_EQ(h.getMin(), 1); + h.popMin(); + // max(5) → {2,3,4} + EXPECT_EQ(h.getMax(), 5); + h.popMax(); + // min(2) → {3,4} + EXPECT_EQ(h.getMin(), 2); + h.popMin(); + // max(4) → {3} + EXPECT_EQ(h.getMax(), 4); + h.popMax(); + // last element + EXPECT_EQ(h.getMin(), 3); + EXPECT_EQ(h.getMax(), 3); + h.popMin(); + + EXPECT_TRUE(h.empty()); +} + +// ---------- Comparator test -------------------------------------------------- + +TEST(MinMaxHeap, CustomComparator) { + IntInvHeap h; // std::greater ⇒ heap treats greater elements as *smaller* + for (int v : {3, 1, 4}) h.push(v); + // now "minimum" is actually the *largest* integer because of comparator + EXPECT_EQ(h.getMin(), 4); + EXPECT_EQ(h.getMax(), 1); +} + +// ---------- Robustness & stress --------------------------------------------- + +TEST(MinMaxHeap, RandomAgainstMultiset) { + constexpr int N = 100; + std::mt19937 rng(42); + std::uniform_int_distribution dist(-100000, 100000); + + IntHeap heap; + std::multiset ref; + + for (int i = 0; i < N; ++i) { + if (ref.empty() || dist(rng) % 2) { // 50 % push + int val = dist(rng); + heap.push(val); + ref.insert(val); + } else if (dist(rng) % 2) { // 25 % popMin + int refMin = *ref.begin(); + EXPECT_EQ(heap.getMin(), refMin); + heap.popMin(); + ref.erase(ref.begin()); + } else { // 25 % popMax + int refMax = *ref.rbegin(); + EXPECT_EQ(heap.getMax(), refMax); + heap.popMax(); + auto it = std::prev(ref.end()); + ref.erase(it); + } + // invariants after each mutation + if (!ref.empty()) { + EXPECT_EQ(heap.getMin(), *ref.begin()); + EXPECT_EQ(heap.getMax(), *ref.rbegin()); + } else { + EXPECT_TRUE(heap.empty()); + } + } +} diff --git a/utils/configcpp/configcpp.cpp b/utils/configcpp/configcpp.cpp index 3f76689e99..97ab54fb2f 100644 --- a/utils/configcpp/configcpp.cpp +++ b/utils/configcpp/configcpp.cpp @@ -547,6 +547,7 @@ std::string Config::getTempFileDir(Config::TempDirPurpose what) { case TempDirPurpose::Joins: return prefix.append("joins/"); case TempDirPurpose::Aggregates: return prefix.append("aggregates/"); + case TempDirPurpose::Sorting: return prefix.append("sorting/"); } // NOTREACHED return {}; diff --git a/utils/configcpp/configcpp.h b/utils/configcpp/configcpp.h index 86e2db67b4..bcb27b57cb 100644 --- a/utils/configcpp/configcpp.h +++ b/utils/configcpp/configcpp.h @@ -196,7 +196,8 @@ class Config enum class TempDirPurpose { Joins, ///< disk joins - Aggregates ///< disk-based aggregation + Aggregates, ///< disk-based aggregation + Sorting ///< disk-based sorting }; /** @brief Return temporaru directory path for the specified purpose */ std::string getTempFileDir(TempDirPurpose what); diff --git a/utils/idbdatafile/utility.h b/utils/idbdatafile/utility.h index 18dae9005d..663337f531 100644 --- a/utils/idbdatafile/utility.h +++ b/utils/idbdatafile/utility.h @@ -22,6 +22,8 @@ #include #include +#include + namespace idbdatafile { /** diff --git a/utils/rowgroup/CMakeLists.txt b/utils/rowgroup/CMakeLists.txt index 2ba4c2d467..ccba6b1d53 100644 --- a/utils/rowgroup/CMakeLists.txt +++ b/utils/rowgroup/CMakeLists.txt @@ -2,7 +2,7 @@ include_directories(${ENGINE_COMMON_INCLUDES}) # ########## next target ############### -set(rowgroup_LIB_SRCS rowaggregation.cpp rowgroup.cpp rowstorage.cpp) +set(rowgroup_LIB_SRCS rowaggregation.cpp rowgroup.cpp rowstorage.cpp dumper.cpp) columnstore_library(rowgroup ${rowgroup_LIB_SRCS}) columnstore_link(rowgroup PRIVATE ${NETSNMP_LIBRARIES} funcexp loggingcpp) diff --git a/utils/rowgroup/dumper.cpp b/utils/rowgroup/dumper.cpp new file mode 100644 index 0000000000..f9cee685f0 --- /dev/null +++ b/utils/rowgroup/dumper.cpp @@ -0,0 +1,226 @@ +/* Copyright (C) 2025 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include + +#include "dumper.h" + +namespace common +{ +std::string errorString(int errNo) +{ + char tmp[1024]; + auto* buf = strerror_r(errNo, tmp, sizeof(tmp)); + return {buf}; +} +} // namespace common + +namespace rowgroup +{ +int Dumper::write(const std::string& fname, const char* buf, size_t sz) +{ + if (sz == 0) + return 0; + + int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (UNLIKELY(fd < 0)) + return errno; + + const char* tmpbuf; + if (fCompressor) + { + auto len = fCompressor->maxCompressedSize(sz); + checkBuffer(len); + fCompressor->compress(buf, sz, fTmpBuf.data(), &len); + tmpbuf = fTmpBuf.data(); + sz = len; + } + else + { + tmpbuf = buf; + } + + auto to_write = sz; + int ret = 0; + while (to_write > 0) + { + auto r = ::write(fd, tmpbuf + sz - to_write, to_write); + if (UNLIKELY(r < 0)) + { + if (errno == EAGAIN) + continue; + + ret = errno; + close(fd); + return ret; + } + assert(size_t(r) <= to_write); + to_write -= r; + } + + close(fd); + return ret; +} + +int Dumper::read(const std::string& fname, std::vector& buf) +{ + int fd = open(fname.c_str(), O_RDONLY); + if (UNLIKELY(fd < 0)) + return errno; + + struct stat st{}; + fstat(fd, &st); + size_t sz = st.st_size; + std::vector* tmpbuf; + if (fCompressor) + { + tmpbuf = &fTmpBuf; + checkBuffer(sz); + } + else + { + tmpbuf = &buf; + buf.resize(sz); + } + + auto to_read = sz; + int ret = 0; + while (to_read > 0) + { + auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read); + if (UNLIKELY(r < 0)) + { + if (errno == EAGAIN) + continue; + + ret = errno; + close(fd); + return ret; + } + + assert(size_t(r) <= to_read); + to_read -= r; + } + + if (fCompressor) + { + size_t len; + if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len)) + { + ret = EPROTO; + close(fd); + return ret; + } + + buf.resize(len); + fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len); + } + + close(fd); + return ret; +} + +size_t Dumper::size() const +{ + return fTmpBuf.size(); +} + +void Dumper::checkBuffer(size_t len) +{ + if (fTmpBuf.size() < len) + { + size_t newtmpsz = (len + 8191) / 8192 * 8192; + std::vector tmpvec(newtmpsz); + // WIP needs OOM check + fMM->acquire(newtmpsz - fTmpBuf.size()); + fTmpBuf.swap(tmpvec); + } +} + +std::string RGDumper::makeRGFilePrefix(const uint16_t generation) const +{ + char buf[PATH_MAX]; + snprintf(buf, sizeof(buf), "%s/p%u-t%ld-g%u-rg", fTmpDir.c_str(), getpid(), + fUniqId, generation); + return {buf}; +} + +// TODO Reuse prefix +std::string RGDumper::makeRGFilename(uint64_t rgid, const uint16_t generation) const +{ + char buf[PATH_MAX]; + snprintf(buf, sizeof(buf), "%s/p%u-t%ld-g%u-rg%lu", fTmpDir.c_str(), getpid(), + fUniqId, generation, rgid); + return {buf}; +} + +void RGDumper::loadRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, + std::unique_ptr& rgdata, bool unlinkDump) +{ + auto fname = makeRGFilename(rgid, generation); + + std::vector data; + int errNo; + if ((errNo = read(fname, data)) != 0) + { + unlink(fname.c_str()); + // WIP replace errorcodes + throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, + common::errorString(errNo)), + logging::ERR_DISKAGG_FILEIO_ERROR); + } + + messageqcpp::ByteStream bs(reinterpret_cast(data.data()), data.size()); + + if (unlinkDump) + unlink(fname.c_str()); + rgdata.reset(new RGData()); + rgdata->deserialize(bs, rowgroup::rgCommonSize); + assert(bs.length() == 0); + + fRowGroupOut.setData(rgdata.get()); + auto memSz = fRowGroupOut.getSizeWithStrings(); + + if (!fMM->acquire(memSz)) + { + // WIP replace errorcodes + throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_AGGREGATION_TOO_BIG), + logging::ERR_AGGREGATION_TOO_BIG); + } +} + +void RGDumper::saveRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, RGData* rgdata) +{ + messageqcpp::ByteStream bs; + fRowGroupOut.setData(rgdata); + rgdata->serialize(bs, fRowGroupOut.getDataSize()); + + int errNo; + auto name = makeRGFilename(rgid, generation); + // std::cout << "RGDumper::saveRG " << name << std::endl; + if ((errNo = write(makeRGFilename(rgid, generation), (char*)bs.buf(), bs.length())) != 0) + { + throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, + common::errorString(errNo)), + logging::ERR_DISKAGG_FILEIO_ERROR); + } +} + +} // namespace rowgroup diff --git a/utils/rowgroup/dumper.h b/utils/rowgroup/dumper.h new file mode 100644 index 0000000000..6c6185b293 --- /dev/null +++ b/utils/rowgroup/dumper.h @@ -0,0 +1,77 @@ +/* Copyright (C) 2025 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include "idbcompress.h" +#include "memmanager.h" +#include "rowgroup.h" + +namespace common +{ +std::string errorString(int errNo); +} + +namespace rowgroup +{ + +class Dumper +{ + public: + Dumper(const compress::CompressInterface* comp, MemManager* mm) + : fCompressor(comp), fMM(std::unique_ptr(mm)) {} + Dumper(const compress::CompressInterface* comp, std::unique_ptr& mm) + : fCompressor(comp), fMM(std::move(mm)) {} + ~Dumper() = default; + int write(const std::string& fname, const char* buf, size_t sz); + int read(const std::string& fname, std::vector& buf); + size_t size() const; + + private: + void checkBuffer(size_t len); + + protected: + const compress::CompressInterface* fCompressor; + std::unique_ptr fMM; + std::vector fTmpBuf; +}; + +class RGDumper : protected Dumper +{ + public: + RGDumper(const compress::CompressInterface* comp, std::unique_ptr mm, const std::string& tmpDir, + const std::string& operationName, const uint64_t uniqId) + : Dumper(comp, mm), fTmpDir(tmpDir), fOperationName(operationName), fUniqId(uniqId) + { + } + ~RGDumper() = default; + void loadRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, + std::unique_ptr& rgdata, bool unlinkDump = false); + void saveRG(uint64_t rgid, const uint16_t generation, RowGroup& fRowGroupOut, RGData* rgdata); + + std::string makeRGFilename(uint64_t rgid, const uint16_t generation) const; + std::string makeRGFilePrefix(const uint16_t generation) const; + + private: + std::string fTmpDir; + std::string fOperationName; + uint64_t fUniqId; +}; + +} // namespace rowgroup \ No newline at end of file diff --git a/utils/rowgroup/memmanager.h b/utils/rowgroup/memmanager.h new file mode 100644 index 0000000000..4a392311eb --- /dev/null +++ b/utils/rowgroup/memmanager.h @@ -0,0 +1,175 @@ +/* Copyright (C) 2021-2025 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include + +#include "resourcemanager.h" + +// TODO change namespace +namespace rowgroup +{ +/** @brief Some service wrapping around ResourceManager (or NoOP) */ + +class MemManager +{ + public: + MemManager() + { + } + virtual ~MemManager() + { + release(fMemUsed); + } + + bool acquire(std::size_t amount) + { + return acquireImpl(amount); + } + void release(ssize_t amount = 0) + { + // in some cases it tries to release more memory than acquired, ie create + // new rowgroup, acquire maximum size (w/o strings), add some rows with + // strings and finally release the actual size of RG with strings + if (amount == 0 || amount > fMemUsed) + amount = fMemUsed; + releaseImpl(amount); + } + + ssize_t getUsed() const + { + return fMemUsed; + } + virtual int64_t getFree() const + { + return std::numeric_limits::max(); + } + + virtual int64_t getConfigured() const + { + return std::numeric_limits::max(); + } + + virtual bool isStrict() const + { + return false; + } + + virtual MemManager* clone() const + { + return new MemManager(); + } + + virtual joblist::ResourceManager* getResourceManaged() + { + return nullptr; + } + virtual boost::shared_ptr getSessionLimit() + { + return {}; + } + + protected: + virtual bool acquireImpl(std::size_t amount) + { + fMemUsed += amount; + return true; + } + virtual void releaseImpl(std::size_t amount) + { + fMemUsed -= amount; + } + ssize_t fMemUsed = 0; +}; + +class RMMemManager : public MemManager +{ + public: + RMMemManager(joblist::ResourceManager* rm, boost::shared_ptr sl, bool wait = true, + bool strict = true) + : fRm(rm), fSessLimit(std::move(sl)), fWait(wait), fStrict(strict) + { + } + + ~RMMemManager() override + { + release(fMemUsed); + fMemUsed = 0; + } + + int64_t getConfigured() const final + { + return fRm->getConfiguredUMMemLimit(); + } + + int64_t getFree() const final + { + return std::min(fRm->availableMemory(), *fSessLimit); + } + + bool isStrict() const final + { + return fStrict; + } + + MemManager* clone() const final + { + return new RMMemManager(fRm, fSessLimit, fWait, fStrict); + } + + joblist::ResourceManager* getResourceManaged() override + { + return fRm; + } + boost::shared_ptr getSessionLimit() override + { + return fSessLimit; + } + + protected: + bool acquireImpl(size_t amount) final + { + if (amount) + { + if (!fRm->getMemory(amount, fSessLimit, fWait) && fStrict) + { + return false; + } + MemManager::acquireImpl(amount); + } + return true; + } + + void releaseImpl(size_t amount) override + { + if (amount) + { + MemManager::releaseImpl(amount); + fRm->returnMemory(amount, fSessLimit); + } + } + + private: + joblist::ResourceManager* fRm = nullptr; + boost::shared_ptr fSessLimit; + const bool fWait; + const bool fStrict; +}; + +} \ No newline at end of file diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 50baf681c1..16ca719d1a 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -365,7 +365,7 @@ class RGData return !!rowData; } - private: +// private: uint32_t rowSize = 0; // can't be. uint32_t columnCount = 0; // shouldn't be, but... boost::shared_ptr rowData; diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 8ab629873b..9e4278f93e 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2021-2022 MariaDB Corporation +/* Copyright (C) 2021 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -23,6 +23,9 @@ #include "rowgroup.h" #include #include + +#include "dumper.h" +#include "memmanager.h" #include "rowstorage.h" #include "robin_hood.h" @@ -77,12 +80,12 @@ int readData(int fd, char* buf, size_t sz) return 0; } -std::string errorString(int errNo) -{ - char tmp[1024]; - auto* buf = strerror_r(errNo, tmp, sizeof(tmp)); - return {buf}; -} +// std::string common::errorString(int errNo) +// { +// char tmp[1024]; +// auto* buf = strerror_r(errNo, tmp, sizeof(tmp)); +// return {buf}; +// } size_t findFirstSetBit(const uint64_t mask) { @@ -275,289 +278,6 @@ struct LRU : public LRUIface List fList; }; -/** @brief Some service wrapping around ResourceManager (or NoOP) */ -class MemManager -{ - public: - MemManager() - { - } - virtual ~MemManager() - { - release(fMemUsed); - } - - bool acquire(ssize_t amount) - { - if (UNLIKELY(-amount > fMemUsed)) - amount = -fMemUsed; - return acquireImpl(amount); - } - void release(ssize_t amount = 0) - { - // in some cases it tries to release more memory than acquired, ie create - // new rowgroup, acquire maximum size (w/o strings), add some rows with - // strings and finally release the actual size of RG with strings - if (amount == 0 || amount > fMemUsed) - amount = fMemUsed; - releaseImpl(amount); - } - - ssize_t getUsed() const - { - return fMemUsed; - } - virtual int64_t getFree() const - { - return std::numeric_limits::max(); - } - - virtual int64_t getConfigured() const - { - return std::numeric_limits::max(); - } - - virtual bool isStrict() const - { - return false; - } - - virtual MemManager* clone() const - { - return new MemManager(); - } - - virtual joblist::ResourceManager* getResourceManaged() - { - return nullptr; - } - virtual boost::shared_ptr getSessionLimit() - { - return {}; - } - - protected: - virtual bool acquireImpl(ssize_t amount) - { - fMemUsed += amount; - return true; - } - virtual void releaseImpl(ssize_t amount) - { - fMemUsed -= amount; - } - ssize_t fMemUsed{0}; -}; - -class RMMemManager : public MemManager -{ - public: - RMMemManager(joblist::ResourceManager* rm, boost::shared_ptr sl, bool wait = true, - bool strict = true) - : fRm(rm), fSessLimit(std::move(sl)), fWait(wait), fStrict(strict) - { - } - - ~RMMemManager() override - { - release(fMemUsed); - fMemUsed = 0; - } - - int64_t getConfigured() const final - { - return fRm->getConfiguredUMMemLimit(); - } - - int64_t getFree() const final - { - return std::min(fRm->availableMemory(), *fSessLimit); - } - - bool isStrict() const final - { - return fStrict; - } - - MemManager* clone() const final - { - return new RMMemManager(fRm, fSessLimit, fWait, fStrict); - } - - joblist::ResourceManager* getResourceManaged() override - { - return fRm; - } - boost::shared_ptr getSessionLimit() override - { - return fSessLimit; - } - - protected: - bool acquireImpl(ssize_t amount) final - { - if (amount) - { - if (!fRm->getMemory(amount, fSessLimit, fWait) && fStrict) - { - return false; - } - MemManager::acquireImpl(amount); - } - return true; - } - - void releaseImpl(ssize_t amount) override - { - if (amount) - { - MemManager::releaseImpl(amount); - fRm->returnMemory(amount, fSessLimit); - } - } - - private: - joblist::ResourceManager* fRm = nullptr; - boost::shared_ptr fSessLimit; - const bool fWait; - const bool fStrict; -}; - -class Dumper -{ - public: - Dumper(const compress::CompressInterface* comp, MemManager* mm) : fCompressor(comp), fMM(mm->clone()) - { - } - - int write(const std::string& fname, const char* buf, size_t sz) - { - if (sz == 0) - return 0; - - int fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (UNLIKELY(fd < 0)) - return errno; - - const char* tmpbuf; - if (fCompressor) - { - auto len = fCompressor->maxCompressedSize(sz); - checkBuffer(len); - fCompressor->compress(buf, sz, fTmpBuf.data(), &len); - tmpbuf = fTmpBuf.data(); - sz = len; - } - else - { - tmpbuf = buf; - } - - auto to_write = sz; - int ret = 0; - while (to_write > 0) - { - auto r = ::write(fd, tmpbuf + sz - to_write, to_write); - if (UNLIKELY(r < 0)) - { - if (errno == EAGAIN) - continue; - - ret = errno; - close(fd); - return ret; - } - assert(size_t(r) <= to_write); - to_write -= r; - } - - close(fd); - return ret; - } - - int read(const std::string& fname, std::vector& buf) - { - int fd = open(fname.c_str(), O_RDONLY); - if (UNLIKELY(fd < 0)) - return errno; - - struct stat st - { - }; - fstat(fd, &st); - size_t sz = st.st_size; - std::vector* tmpbuf; - if (fCompressor) - { - tmpbuf = &fTmpBuf; - checkBuffer(sz); - } - else - { - tmpbuf = &buf; - buf.resize(sz); - } - - auto to_read = sz; - int ret = 0; - while (to_read > 0) - { - auto r = ::read(fd, tmpbuf->data() + sz - to_read, to_read); - if (UNLIKELY(r < 0)) - { - if (errno == EAGAIN) - continue; - - ret = errno; - close(fd); - return ret; - } - - assert(size_t(r) <= to_read); - to_read -= r; - } - - if (fCompressor) - { - size_t len; - if (!fCompressor->getUncompressedSize(tmpbuf->data(), sz, &len)) - { - ret = EPROTO; - close(fd); - return ret; - } - - buf.resize(len); - fCompressor->uncompress(tmpbuf->data(), sz, buf.data(), &len); - } - - close(fd); - return ret; - } - - size_t size() const - { - return fTmpBuf.size(); - } - - private: - void checkBuffer(size_t len) - { - if (fTmpBuf.size() < len) - { - size_t newtmpsz = (len + 8191) / 8192 * 8192; - std::vector tmpvec(newtmpsz); - fMM->acquire(newtmpsz - fTmpBuf.size()); - fTmpBuf.swap(tmpvec); - } - } - - private: - const compress::CompressInterface* fCompressor; - std::unique_ptr fMM; - std::vector fTmpBuf; -}; - /** @brief Storage for RGData with LRU-cache & memory management */ class RowGroupStorage @@ -718,7 +438,7 @@ class RowGroupStorage if (UNLIKELY(r < 0)) { throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg( - logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR); } } @@ -1117,7 +837,7 @@ class RowGroupStorage if (UNLIKELY(fd < 0)) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR); } uint64_t sz = fRGDatas.size(); @@ -1131,7 +851,7 @@ class RowGroupStorage close(fd); unlink(fname.c_str()); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } close(fd); @@ -1145,7 +865,7 @@ class RowGroupStorage if (fd < 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR); } uint64_t sz; @@ -1157,7 +877,7 @@ class RowGroupStorage close(fd); unlink(fname.c_str()); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } fRGDatas.resize(sz); @@ -1167,7 +887,7 @@ class RowGroupStorage close(fd); unlink(fname.c_str()); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } close(fd); @@ -1361,7 +1081,7 @@ class RowGroupStorage { unlink(fname.c_str()); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } @@ -1415,7 +1135,7 @@ class RowGroupStorage if ((errNo = fDumper->write(makeRGFilename(rgid), (char*)bs.buf(), bs.length())) != 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } } @@ -1604,7 +1324,7 @@ class RowPosHashStorage if ((errNo = fDumper->write(makeDumpName(), (char*)fPosHashes.data(), sz)) != 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } } @@ -1638,7 +1358,7 @@ class RowPosHashStorage if ((errNo = fDumper->read(makeDumpName(), data)) != 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } @@ -2233,7 +1953,7 @@ void RowAggStorage::dumpInternalData() const if (fd < 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR); } @@ -2242,7 +1962,7 @@ void RowAggStorage::dumpInternalData() const { close(fd); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } close(fd); @@ -2450,7 +2170,7 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, siz if (fd < 0) { throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errno)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errno)), logging::ERR_DISKAGG_FILEIO_ERROR); } struct stat st @@ -2464,7 +2184,7 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, siz { close(fd); throw logging::IDBExcept( - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, errorString(errNo)), + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_DISKAGG_FILEIO_ERROR, common::errorString(errNo)), logging::ERR_DISKAGG_FILEIO_ERROR); } close(fd); diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 72c20e9327..2211d1bbde 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -17,14 +17,16 @@ #pragma once -#include "resourcemanager.h" -#include "rowgroup.h" -#include "idbcompress.h" #include #include #include #include +#include "idbcompress.h" +#include "memmanager.h" +#include "resourcemanager.h" +#include "rowgroup.h" + namespace rowgroup { uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets, @@ -67,7 +69,7 @@ class RowAggStorage static uint16_t getMaxRows(bool enabledDiskAgg) { - return (enabledDiskAgg ? 8192 : 256); + return (enabledDiskAgg ? rowgroup::rgCommonSize : 256); } static size_t getBucketSize(); diff --git a/utils/structs/minmaxheap.h b/utils/structs/minmaxheap.h new file mode 100644 index 0000000000..fba0eef5cd --- /dev/null +++ b/utils/structs/minmaxheap.h @@ -0,0 +1,177 @@ +#include +#include +#include // std::swap +#include // std::min_element, std::max_element + +template > +class MinMaxHeap { +public: + // --- интерфейс --- + bool empty() const noexcept { return data_.empty(); } + std::size_t size() const noexcept { return data_.size(); } + + const T& getMin() const { + if (empty()) throw std::out_of_range("heap empty"); + return data_[0]; + } + + const T& getMax() const { + if (empty()) throw std::out_of_range("heap empty"); + if (data_.size() == 1) return data_[0]; + if (data_.size() == 2) return data_[1]; + return cmp_(data_[1], data_[2]) ? data_[2] : data_[1]; // whichever is larger + } + + void push(const T& x) { + data_.push_back(x); + bubbleUp(data_.size() - 1); + } + + void popMin() { + if (empty()) throw std::out_of_range("heap empty"); + moveLastToRootThenTrickleDown(0); + } + + void popMax() { + if (empty()) throw std::out_of_range("heap empty"); + std::size_t maxi = maxIndexUnderRoot(); + moveLastToRootThenTrickleDown(maxi); + } + +private: + // --- внутренние данные --- + std::vector data_; + Compare cmp_{}; // по умолчанию std::less ⇒ min на чётных уровнях + + // --- индексы --- + static std::size_t parent(std::size_t i) { return (i - 1) / 2; } + static std::size_t grand(std::size_t i) { return (i - 1) / 4; } + static std::size_t left (std::size_t i) { return 2 * i + 1; } + static std::size_t right (std::size_t i) { return 2 * i + 2; } + + static bool isMinLevel(std::size_t i) { return (std::bit_width(i + 1) & 1) == 1; } + // bit_width(n) ≡ floor(log2(n)) + 1 (C++20 ) + + // bubble-up (вставка) + void bubbleUp(std::size_t i) { + if (i == 0) return; + std::size_t p = parent(i); + if (isMinLevel(i)) { + if (!cmp_(data_[i], data_[p])) { // x > parent ⇒ уровень макс <--> + std::swap(data_[i], data_[p]); + bubbleUpMax(p); + } else { + bubbleUpMin(i); + } + } else { // max-уровень + if (cmp_(data_[i], data_[p])) { // x < parent ⇒ уровень мин <--> + std::swap(data_[i], data_[p]); + bubbleUpMin(p); + } else { + bubbleUpMax(i); + } + } + } + + void bubbleUpMin(std::size_t i) { + while (i >= 3) { + std::size_t g = grand(i); + if (cmp_(data_[i], data_[g])) { + std::swap(data_[i], data_[g]); + i = g; + } else break; + } + } + + void bubbleUpMax(std::size_t i) { + while (i >= 3) { + std::size_t g = grand(i); + if (!cmp_(data_[i], data_[g])) { + std::swap(data_[i], data_[g]); + i = g; + } else break; + } + } + + // trickle-down (удаление) + void trickleDown(std::size_t i) { + if (isMinLevel(i)) + trickleDownMin(i); + else + trickleDownMax(i); + } + + void trickleDownMin(std::size_t i) { + while (left(i) < data_.size()) { + std::size_t m = minDescendant(i); + if (isGrandchild(i, m)) { + if (cmp_(data_[m], data_[i])) std::swap(data_[m], data_[i]); + std::size_t p = parent(m); + if (!cmp_(data_[m], data_[p])) std::swap(data_[m], data_[p]); + i = m; + } else { // ребёнок + if (cmp_(data_[m], data_[i])) std::swap(data_[m], data_[i]); + break; + } + } + } + + void trickleDownMax(std::size_t i) { + while (left(i) < data_.size()) { + std::size_t m = maxDescendant(i); + if (isGrandchild(i, m)) { + if (!cmp_(data_[m], data_[i])) std::swap(data_[m], data_[i]); + std::size_t p = parent(m); + if (cmp_(data_[m], data_[p])) std::swap(data_[m], data_[p]); + i = m; + } else { + if (!cmp_(data_[m], data_[i])) std::swap(data_[m], data_[i]); + break; + } + } + } + + // вспом-ки + bool isGrandchild(std::size_t i, std::size_t m) const { + return m >= left(left(i)); + } + + std::size_t minDescendant(std::size_t i) const { + return descendantByCompare(i, /*wantMin=*/true); + } + + std::size_t maxDescendant(std::size_t i) const { + return descendantByCompare(i, /*wantMin=*/false); + } + + std::size_t descendantByCompare(std::size_t i, bool wantMin) const { + std::size_t best = left(i); + if (right(i) < data_.size() && + (wantMin ? cmp_(data_[right(i)], data_[best]) + : cmp_(data_[best], data_[right(i)]))) + best = right(i); + + // внуки + for (std::size_t c = left(left(i)); c < data_.size() && c <= right(right(i)); ++c) { + bool better = wantMin ? cmp_(data_[c], data_[best]) + : cmp_(data_[best], data_[c]); + if (better) best = c; + } + return best; + } + + // удалить узел j: перенести последний элемент на место j и опустить + void moveLastToRootThenTrickleDown(std::size_t j) { + data_[j] = std::move(data_.back()); + data_.pop_back(); + if (j < data_.size()) + trickleDown(j); + } + + // индекс max-элемента среди детей корня + std::size_t maxIndexUnderRoot() const { + if (data_.size() == 1) return 0; + if (data_.size() == 2) return 1; + return cmp_(data_[1], data_[2]) ? 2 : 1; + } +}; \ No newline at end of file diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index 67f0379376..c8ca39de5d 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -458,7 +458,7 @@ class IdbOrderBy : public IdbCompare std::unique_ptr fOrderByQueue = nullptr; - protected: +// protected: std::vector fOrderByCond; rowgroup::Row fRow0; CompareRule fRule;