diff --git a/libgalois/include/galois/worklists/AdaptiveObim.h b/libgalois/include/galois/worklists/AdaptiveObim.h index e3ef9d5965..6007ca43cd 100644 --- a/libgalois/include/galois/worklists/AdaptiveObim.h +++ b/libgalois/include/galois/worklists/AdaptiveObim.h @@ -21,8 +21,6 @@ #ifndef GALOIS_WORKLIST_ADAPTIVEOBIM_H #define GALOIS_WORKLIST_ADAPTIVEOBIM_H -// #define UNMERGE_ENABLED - #include #include #include @@ -30,6 +28,7 @@ #include #include "galois/config.h" + #include "galois/FlatMap.h" #include "galois/Timer.h" #include "galois/substrate/PaddedLock.h" @@ -40,6 +39,32 @@ namespace galois { namespace worklists { +namespace internal { + +template +struct AdaptiveOrderedByIntegerMetricComparator { + typedef std::less compare_t; + Index identity; + Index earliest; + + AdaptiveOrderedByIntegerMetricComparator() + : identity(std::numeric_limits::max()), + earliest(std::numeric_limits::min()) {} +}; + +template +struct AdaptiveOrderedByIntegerMetricComparator { + typedef std::greater compare_t; + Index identity; + Index earliest; + + AdaptiveOrderedByIntegerMetricComparator() + : identity(std::numeric_limits::min()), + earliest(std::numeric_limits::max()) {} +}; + +} // namespace internal + /** * Approximate priority scheduling. Indexer is a default-constructable class * whose instances conform to R r = indexer(item) where R is some @@ -55,93 +80,111 @@ namespace worklists { * int operator()(Item i) const { return i.index; } * }; * - * typedef galois::WorkList::AdaptiveOrderedByIntegerMetric WL; + * typedef galois::worklists::AdaptiveOrderedByIntegerMetric WL; * galois::for_each(items.begin(), items.end(), Fn); * \endcode * - * @tparam Indexer Indexer class - * @tparam Container Scheduler for each bucket - * @tparam BlockPeriod Check for higher priority work every 2^BlockPeriod - * iterations - * @tparam BSP Use back-scan prevention - * @tparam uniformBSP Use uniform back-scan prevention - * @tparam T Work item type - * @tparam Index Indexer return type - * @tparam UseMonotonic Assume that an activity at priority p will not - * schedule work at priority p or any priority p1 - * where p1 < p. - * @tparam UseDescending Use descending order instead - * @tparam Concurrent Whether or not to allow concurrent execution - * + * @tparam Indexer Indexer class + * @tparam Container Scheduler for each bucket + * @tparam BlockPeriod Check for higher priority work every 2^BlockPeriod + * iterations + * @tparam BSP Use back-scan prevention + * @tparam uniformBSP Use uniform back-scan prevention + * @tparam T Work item type + * @tparam Index Indexer return type + * @tparam UseMonotonic Assume that an activity at priority p will not + * schedule work at priority p or any priority p1 where p1 < p. + * @tparam UseDescending Use descending order instead + * @tparam Concurrent Whether or not to allow concurrent execution */ template , typename Container = PerSocketChunkFIFO<>, int BlockPeriod = 0, bool BSP = true, bool uniformBSP = false, int chunk_size = 64, - typename T = int, typename Index = int, bool UseMonotonic = false, - bool UseDescending = false, bool Concurrent = true> -struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { - template + typename T = int, typename Index = int, bool EnableUmerge = false, + bool UseMonotonic = false, bool UseDescending = false, + bool Concurrent = true> +struct AdaptiveOrderedByIntegerMetric + : private boost::noncopyable, + public internal::AdaptiveOrderedByIntegerMetricComparator { + template + using retype = AdaptiveOrderedByIntegerMetric< + Indexer, typename Container::template retype<_T>, BlockPeriod, BSP, + uniformBSP, chunk_size, _T, typename std::result_of::type, + EnableUmerge, UseMonotonic, UseDescending, Concurrent>; + + template using rethread = AdaptiveOrderedByIntegerMetric< - Indexer, typename Container::template rethread, BlockPeriod, - BSP, uniformBSP, chunk_size, T, Index, UseMonotonic, UseDescending, - Concurrent_>; + Indexer, typename Container::template rethread<_b>, BlockPeriod, BSP, + uniformBSP, chunk_size, T, Index, EnableUmerge, UseMonotonic, + UseDescending, _b>; + + template + struct with_block_period { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, Container, _period, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + template + struct with_container { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, _container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; - template - using retype = AdaptiveOrderedByIntegerMetric< - Indexer, typename Container::template retype, BlockPeriod, BSP, - uniformBSP, chunk_size, T_, typename std::result_of::type, - UseMonotonic, UseDescending, Concurrent>; - - template - using with_block_period = - AdaptiveOrderedByIntegerMetric; - - template - using with_container = - AdaptiveOrderedByIntegerMetric; - - template - using with_indexer = - AdaptiveOrderedByIntegerMetric; - - template - using with_back_scan_prevention = - AdaptiveOrderedByIntegerMetric; - - template - using with_monotonic = - AdaptiveOrderedByIntegerMetric; - - template - using with_descending = - AdaptiveOrderedByIntegerMetric; - - using compare_t = - std::conditional_t, std::less>; + template + struct with_indexer { + AdaptiveOrderedByIntegerMetric< + _indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_back_scan_prevention { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, _bsp, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_unmerge { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + _enable_unmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_monotonic { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, _use_monotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_descending { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, _use_descending, Concurrent> + type; + }; typedef T value_type; typedef Index index_type; + typedef uint32_t delta_type; private: - static inline compare_t compare; - static constexpr Index earliest = UseDescending - ? std::numeric_limits::min() - : std::numeric_limits::max(); - // static constexpr identity = UseDescending ? - // std::numeric_limits::max() : std::numeric_limits::min(); - unsigned int delta; + typedef typename Container::template rethread CTy; + typedef internal::AdaptiveOrderedByIntegerMetricComparator + Comparator; + static inline typename Comparator::compare_t compare; + delta_type delta; unsigned int counter; unsigned int maxIndex; unsigned int lastSizeMasterLog; @@ -150,13 +193,12 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // smaller delta insertions are prioritirized struct deltaIndex { Index k; // note: original index is stored here - unsigned int d; + delta_type d; // taking the max of deltas and doing right shift eq. shifting priority with // d-max(d1, d2) deltaIndex() : k(0), d(0) {} - deltaIndex(Index k1, unsigned int d1) : k(k1), d(d1) {} - unsigned int id() { return k; } + deltaIndex(Index k1, delta_type d1) : k(k1), d(d1) {} bool operator==(const deltaIndex& a) const { unsigned delt = std::max(d, a.d); Index a1 = k >> delt; @@ -217,7 +259,6 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } }; - typedef typename Container::template rethread CTy; typedef galois::flat_map LMapTy; struct ThreadData { @@ -228,26 +269,42 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { unsigned int lastMasterVersion; unsigned int numPops; - unsigned int sinceLastFix; + unsigned int popsLastFix; unsigned int slowPopsLastPeriod; unsigned int pushesLastPeriod; - unsigned int priosLastPeriod; - - unsigned long pmodAllDeq; - unsigned int popsFromSameQ; - unsigned int ctr; - - Index maxPrioDiffLastPeriod; + struct { + size_t pmodAllDeq; + unsigned int priosLastPeriod; + unsigned int numUmerges; + Index maxPrioDiffLastPeriod; + } stats; Index minPrio; Index maxPrio; substrate::PaddedLock lock; + void cleanup() { + popsLastFix = 0; + slowPopsLastPeriod = 0; + pushesLastPeriod = 0; + + stats.priosLastPeriod = 0; + stats.maxPrioDiffLastPeriod = 0; + + minPrio = std::numeric_limits::max(); + maxPrio = std::numeric_limits::min(); + } + + inline bool isSlowPopFreq(double threshold) { + // return ((double)slowPopsLastPeriod / (double)popsLastFix) > threshold; + return ((double)slowPopsLastPeriod > (double)popsLastFix) * threshold; + } + ThreadData(Index initial) : curIndex(initial, 0), scanStart(initial, 0), current(0), - lastMasterVersion(0), numPops(0), sinceLastFix(0), - slowPopsLastPeriod(0), pushesLastPeriod(0), priosLastPeriod(0), - pmodAllDeq(0), popsFromSameQ(0), ctr(0), maxPrioDiffLastPeriod(0), + lastMasterVersion(0), numPops(0), popsLastFix(0), + slowPopsLastPeriod(0), pushesLastPeriod(0), + popsFromSameQ(0), stats{0, 0, 0, 0}, minPrio(std::numeric_limits::max()), maxPrio(std::numeric_limits::min()) {} }; @@ -256,7 +313,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // NB: Place dynamically growing masterLog after fixed-size PerThreadStorage // members to give higher likelihood of reclaiming PerThreadStorage - substrate::PerThreadStorage current; + substrate::PerThreadStorage data; substrate::PaddedLock masterLock; MasterLog masterLog; @@ -292,56 +349,50 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // first give it some time // then check the fdeq frequency - if (myID == 0 && p.sinceLastFix > counter && - ((double)(p.slowPopsLastPeriod) / (double)(p.sinceLastFix)) > - 1.0 / (double)(chunk_size)) { + if (myID == 0 && p.popsLastFix > counter && + p.isSlowPopFreq(1.0 / (double)(chunk_size))) { + unsigned long numPushesThisStep = p.pushesLastPeriod; + unsigned long priosCreatedThisPeriod = p.stats.priosLastPeriod; + unsigned long allPmodDeqCounts = p.stats.pmodAllDeq; + Index minOfMin = p.minPrio; + Index maxOfMax = p.maxPrio; + p.cleanup(); for (unsigned i = 1; i < runtime::activeThreads; ++i) { - while (current.getRemote(i)->lock.try_lock()) + while (!data.getRemote(i)->lock.try_lock()) ; - } - unsigned long priosCreatedThisPeriod = 0; - unsigned long numPushesThisStep = 0; - unsigned long allPmodDeqCounts = 0; - Index minOfMin = std::numeric_limits::max(); - Index maxOfMax = std::numeric_limits::min(); - for (unsigned i = 0; i < runtime::activeThreads; ++i) { - Index& otherMinPrio = current.getRemote(i)->minPrio; - minOfMin = compare(minOfMin, otherMinPrio) ? minOfMin : otherMinPrio; - Index& otherMaxPrio = current.getRemote(i)->maxPrio; - maxOfMax = compare(otherMaxPrio, maxOfMax) ? maxOfMax : otherMaxPrio; - priosCreatedThisPeriod += current.getRemote(i)->priosLastPeriod; - numPushesThisStep += current.getRemote(i)->pushesLastPeriod; - allPmodDeqCounts += current.getRemote(i)->pmodAllDeq; - current.getRemote(i)->sinceLastFix = 0; - current.getRemote(i)->slowPopsLastPeriod = 0; - current.getRemote(i)->pushesLastPeriod = 0; - current.getRemote(i)->priosLastPeriod = 0; - current.getRemote(i)->maxPrioDiffLastPeriod = 0; - - current.getRemote(i)->minPrio = std::numeric_limits::max(); - current.getRemote(i)->maxPrio = std::numeric_limits::min(); - } - if (((double)numPushesThisStep / - ((double)((maxOfMax >> delta) - (minOfMin >> delta)))) < - chunk_size / 2) { - double xx = ((double)(chunk_size) / - ((double)numPushesThisStep / - ((double)((maxOfMax >> delta) - (minOfMin >> delta))))); - delta += std::floor(std::log2(xx)); - counter *= 2; + Index& otherMinPrio = data.getRemote(i)->minPrio; + minOfMin = std::min(minOfMin, otherMinPrio, compare); + Index& otherMaxPrio = data.getRemote(i)->maxPrio; + maxOfMax = std::max(otherMaxPrio, maxOfMax, compare); + numPushesThisStep += data.getRemote(i)->pushesLastPeriod; + priosCreatedThisPeriod += data.getRemote(i)->stats.priosLastPeriod; + allPmodDeqCounts += data.getRemote(i)->stats.pmodAllDeq; + + data.getRemote(i)->cleanup(); + data.getRemote(i)->lock.unlock(); } - for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.unlock(); + if ((double)numPushesThisStep) { + Index prioRange = (maxOfMax >> delta) - (minOfMin >> delta); + // Division is expensive + // double fillRatio = ((double)numPushesThisStep / (double)prioRange); + if (numPushesThisStep < (chunk_size >> 1) * prioRange) { + // Ditto + // double xx = ((double)(chunk_size) / fillRatio); + double xx = std::log2(chunk_size) - std::log2(numPushesThisStep) + + std::log2(prioRange); + assert(xx); + delta += std::floor(xx); + counter <<= 1; + } } } -#ifdef UNMERGE_ENABLED // serif added here // make sure delta is bigger than 0 so that we can actually unmerge things // give it some time and check the same queue pops - else if (delta > 0 && myID == 0 && p.sinceLastFix > counter && - p.popsFromSameQ > 4 * chunk_size) { + else if (EnableUmerge && delta > 0 && myID == 0 && + p.popsLastFix > counter && p.popsFromSameQ > (chunk_size << 2)) { if (((p.maxPrio >> delta) - (p.minPrio >> delta)) < 16 && ((double)p.pushesLastPeriod / ((double)((p.maxPrio >> delta) - (p.minPrio >> delta)))) > @@ -357,63 +408,48 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { else delta = 0; + p.cleanup(); for (unsigned i = 1; i < runtime::activeThreads; ++i) { - while (current.getRemote(i)->lock.try_lock()) + while (!data.getRemote(i)->lock.try_lock()) ; + data.getRemote(i)->cleanup(); + data.getRemote(i)->lock.unlock(); } - - for (unsigned i = 0; i < runtime::activeThreads; ++i) { - - current.getRemote(i)->sinceLastFix = 0; - current.getRemote(i)->slowPopsLastPeriod = 0; - current.getRemote(i)->pushesLastPeriod = 0; - current.getRemote(i)->priosLastPeriod = 0; - current.getRemote(i)->maxPrioDiffLastPeriod = 0; - - current.getRemote(i)->minPrio = std::numeric_limits::max(); - current.getRemote(i)->maxPrio = std::numeric_limits::min(); - } - - for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.unlock(); - } - p.ctr++; + p.stats.numUmerges++; } p.popsFromSameQ = 0; } -#endif - p.popsFromSameQ = 0; + // p.popsFromSameQ = 0; - updateLocal(p); bool localLeader = substrate::ThreadPool::isLeader(); + deltaIndex msS(this->earliest, 0); + + updateLocal(p); - deltaIndex msS; - msS.k = std::numeric_limits::min(); - msS.d = 0; if (BSP && !UseMonotonic) { msS = p.scanStart; if (localLeader || uniformBSP) { for (unsigned i = 0; i < runtime::activeThreads; ++i) { - msS = std::min(msS, current.getRemote(i)->scanStart); + msS = std::min(msS, data.getRemote(i)->scanStart); } } else { msS = std::min( - msS, - current.getRemote(substrate::ThreadPool::getLeader())->scanStart); + msS, data.getRemote(substrate::ThreadPool::getLeader())->scanStart); } } - for (auto ii = p.local.lower_bound(msS), ee = p.local.end(); ii != ee; + for (auto ii = p.local.lower_bound(msS), ei = p.local.end(); ii != ei; ++ii) { - galois::optional retval; - if ((retval = ii->second->pop())) { + galois::optional item; + if ((item = ii->second->pop())) { p.current = ii->second; p.curIndex = ii->first; p.scanStart = ii->first; p.lock.unlock(); - return retval; + return item; } } + p.lock.unlock(); return galois::optional(); } @@ -422,23 +458,23 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { CTy* slowUpdateLocalOrCreate(ThreadData& p, deltaIndex i) { // update local until we find it or we get the write lock do { - CTy* lC; updateLocal(p); + CTy* lC; if ((lC = p.local[i])) return lC; } while (!masterLock.try_lock()); // we have the write lock, update again then create updateLocal(p); - CTy*& lC2 = p.local[i]; - if (!lC2) { - lC2 = new (heap.allocate(sizeof(CTy))) CTy(); + CTy*& C2 = p.local[i]; + if (!C2) { + C2 = new (heap.allocate(sizeof(CTy))) CTy(); p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1; - masterLog.push_back(std::make_pair(i, lC2)); + masterLog.push_back(std::make_pair(i, C2)); masterVersion.fetch_add(1); - p.priosLastPeriod++; + p.stats.priosLastPeriod++; } masterLock.unlock(); - return lC2; + return C2; } inline CTy* updateLocalOrCreate(ThreadData& p, deltaIndex i) { @@ -453,13 +489,13 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { public: AdaptiveOrderedByIntegerMetric(const Indexer& x = Indexer()) - : current(earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { + : data(this->earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { delta = 0; counter = chunk_size; } ~AdaptiveOrderedByIntegerMetric() { - ThreadData& p = *current.getLocal(); + ThreadData& p = *data.getLocal(); updateLocal(p); // Deallocate in LIFO order to give opportunity for simple garbage // collection @@ -472,12 +508,13 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } void push(const value_type& val) { - ThreadData& p = *current.getLocal(); + deltaIndex index; + ThreadData& p = *data.getLocal(); while (!p.lock.try_lock()) ; - Index ind = indexer(val); - deltaIndex index; - index.k = ind; + + p.pushesLastPeriod++; + index.k = indexer(val); index.d = delta; if (index.k > p.maxPrio) { p.maxPrio = index.k; @@ -485,7 +522,6 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { if (index.k < p.minPrio) { p.minPrio = index.k; } - p.pushesLastPeriod++; // Fast path if (index == p.curIndex && p.current) { @@ -495,7 +531,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } // Slow path - CTy* lC = updateLocalOrCreate(p, index); + CTy* C = updateLocalOrCreate(p, index); if (BSP && index < p.scanStart) p.scanStart = index; // Opportunistically move to higher priority work @@ -504,47 +540,45 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { p.popsFromSameQ = 0; p.curIndex = index; - p.current = lC; + p.current = C; } - lC->push(val); + C->push(val); + p.lock.unlock(); } template - unsigned int push(Iter b, Iter e) { - int npush; + size_t push(Iter b, Iter e) { + size_t npush; for (npush = 0; b != e; npush++) push(*b++); return npush; } template - unsigned int push_initial(const RangeTy& range) { + size_t push_initial(const RangeTy& range) { auto rp = range.local_pair(); return push(rp.first, rp.second); } galois::optional pop() { - ThreadData& p = *current.getLocal(); + ThreadData& p = *data.getLocal(); while (!p.lock.try_lock()) ; + CTy* C = p.current; - p.sinceLastFix++; - - unsigned myID = galois::substrate::ThreadPool::getTID(); - - current.getRemote(myID)->pmodAllDeq++; + p.popsLastFix++; + p.stats.pmodAllDeq++; - CTy* C = p.current; - if (BlockPeriod && - (BlockPeriod < 0 || ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0))) + if (BlockPeriod && ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0)) return slowPop(p); - galois::optional retval; - if (C && (retval = C->pop())) { + galois::optional item; + if (C && (item = C->pop())) { p.popsFromSameQ++; + p.lock.unlock(); - return retval; + return item; } // Slow path @@ -552,7 +586,8 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } }; GALOIS_WLCOMPILECHECK(AdaptiveOrderedByIntegerMetric) -} // namespace worklists -} // namespace galois + +} // end namespace worklists +} // end namespace galois #endif