Skip to content

Commit 74dca6d

Browse files
committed
fixed & improved file I/O error handling
1 parent 5509479 commit 74dca6d

File tree

7 files changed

+91
-47
lines changed

7 files changed

+91
-47
lines changed

src/classification.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -608,21 +608,17 @@ void map_queries_to_targets_default(
608608
results.mapout << buf.out.str();
609609
};
610610

611-
//runs in case of notification events
612-
const auto showStatus = [&] (const std::string& msg, float progress) {
611+
//runs if something needs to be appended to the output
612+
const auto appendToOutput = [&] (const std::string& msg) {
613613
if(opt.output.mapViewMode != map_view_mode::none) {
614614
results.mapout << opt.output.format.comment << msg << '\n';
615615
}
616-
if(progress >= 0.0f) {
617-
show_progress_indicator(results.status, progress);
618-
}
619616
};
620617

621-
622618
//run (parallel) database queries according to processing options
623619
query_database(infiles, db, opt.process,
624620
makeBatchBuffer, processQuery, finalizeBatch,
625-
showStatus);
621+
appendToOutput);
626622

627623
//target -> hits list?
628624
if(tgtMatches.empty()) return;

src/cmdline_utility.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace mc {
3030
//-------------------------------------------------------------------
3131
void show_progress_indicator(std::ostream& os, float done, int totalLength)
3232
{
33+
if(done < 0.f) done = 0.f;
3334
auto m = int((totalLength - 7) * done);
3435
os << "\r[";
3536
for(int j = 0; j < m; ++j) os << '=';

src/mode_query.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,9 @@ void process_input_files(const vector<string>& infiles,
154154
return;
155155
}
156156
else {
157-
bool anyreadable = false;
158-
for(const auto& f : infiles) {
159-
if(file_readable(f)) { anyreadable = true; break; }
160-
}
161-
if(!anyreadable) {
157+
bool noneReadable = std::none_of(infiles.begin(), infiles.end(),
158+
[](const auto& f) { return file_readable(f); });
159+
if(noneReadable) {
162160
throw std::runtime_error{
163161
"None of the query sequence files could be opened"};
164162
}

src/query_options.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ get_query_processing_options(const args_parser& args,
102102
auto batchSize = args.get<int>("batch-size", defaults.batchSize);
103103
if(batchSize >= 1) opt.batchSize = batchSize;
104104

105+
auto numSeq = args.get<int>("per-thread-sequential-queries",
106+
defaults.perThreadSequentialQueries);
107+
108+
if(numSeq >= 1) {
109+
opt.perThreadSequentialQueries = numSeq;
110+
}
111+
else {
112+
//adapt to number of threads
113+
if(numThreads < 64) {
114+
opt.perThreadSequentialQueries = 4;
115+
} else {
116+
opt.perThreadSequentialQueries = 8;
117+
}
118+
}
119+
105120
opt.queryLimit = args.get<std::int_least64_t>({"query-limit"},
106121
defaults.queryLimit);
107122

src/query_options.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ struct query_processing_options
107107

108108
std::size_t batchSize = 4096 * std::thread::hardware_concurrency();
109109

110+
int perThreadSequentialQueries = 4;
111+
110112
//limits number of reads per sequence source (file)
111113
std::int_least64_t queryLimit = std::numeric_limits<std::int_least64_t>::max();
112114
};

src/querying.h

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
#define MC_QUERYING_H_
2424

2525
#include <vector>
26+
#include <iostream>
2627

2728
#include "config.h"
2829
#include "sequence_io.h"
2930
#include "cmdline_utility.h"
3031
#include "query_options.h"
32+
#include "cmdline_utility.h"
3133

3234

3335
namespace mc {
@@ -75,15 +77,15 @@ struct sequence_query
7577
*****************************************************************************/
7678
template<
7779
class BufferSource, class BufferUpdate, class BufferSink,
78-
class StatusCallback
80+
class LogCallback
7981
>
8082
void query_batched(
8183
// parallel_queue& queue,
8284
const std::string& filename1, const std::string& filename2,
8385
const database& db, const query_processing_options& opt,
8486
query_id& startId,
8587
BufferSource&& getBuffer, BufferUpdate&& update, BufferSink&& finalize,
86-
StatusCallback&& showStatus)
88+
LogCallback&& log)
8789
{
8890
std::mutex exceptionMtx;
8991
std::mutex finalizeMtx;
@@ -99,22 +101,23 @@ void query_batched(
99101
// assign work to threads
100102
for(int i = 0; i < opt.numThreads; ++i) {
101103
threads.emplace_back([&] {
102-
int threadId = i;
104+
const int threadId = i;
103105
try {
104106
sequence_pair_reader reader{filename1, filename2};
105107

106108
auto buffer = getBuffer();
107109
match_locations matches;
108-
bool empty = true;
110+
bool bufferEmpty = true;
111+
112+
const auto readSequentially = opt.perThreadSequentialQueries;
109113

110-
const int readAtOnce = 4; // 16 threads
111-
// const int readAtOnce = 4; // 32 threads
112-
// const int readAtOnce = 8; // 64 threads
113114
std::vector<sequence_pair_reader::sequence_pair> sequences;
114-
sequences.reserve(readAtOnce);
115+
sequences.reserve(readSequentially);
115116

116-
while(reader.has_next()) {
117-
for(std::size_t i = 0; i < opt.batchSize && queryLimit.fetch_sub(readAtOnce) > 0; ++i) {
117+
while(reader.has_next() && queryLimit > 0) {
118+
for(std::size_t i = 0;
119+
i < opt.batchSize && queryLimit.fetch_sub(readSequentially) > 0; ++i)
120+
{
118121
query_id myQid;
119122
sequence_pair_reader::stream_positions myPos;
120123

@@ -129,25 +132,20 @@ void query_batched(
129132
reader.index_offset(myQid);
130133

131134
// get work id and skip to this read
132-
auto wid = workId.fetch_add(readAtOnce);
135+
auto wid = workId.fetch_add(readSequentially);
133136
if(myQid != wid) {
134137
reader.skip(wid-myQid);
135138
}
136139
if(!reader.has_next()) break;
137140
// auto seq = reader.next();
138-
for(int i = 0; i < readAtOnce; ++i) {
139-
// auto seq = reader.next();
140-
// if(!seq.first.header.empty())
141-
// sequences.emplace_back(seq);
141+
for(int i = 0; i < readSequentially; ++i) {
142142
sequences.emplace_back(reader.next());
143143
}
144144

145145
// update most recent position and query id
146146
myQid = reader.index();
147147
myPos = reader.tell();
148148
while(myQid > qid) {// reading qid unsafe
149-
// std::unique_lock<std::mutex> lock(tipMtx, std::try_to_lock);
150-
// if(lock.owns_lock()) {
151149
if(tipMtx.try_lock()) {
152150
if(myQid > qid) {// reading qid safe
153151
qid = myQid;
@@ -159,7 +157,7 @@ void query_batched(
159157

160158
for(auto& seq : sequences) {
161159
if(!seq.first.header.empty()) {
162-
empty = false;
160+
bufferEmpty = false;
163161
matches.clear();
164162

165163
db.accumulate_matches(seq.first.data, matches);
@@ -177,21 +175,21 @@ void query_batched(
177175
}
178176
sequences.clear();
179177
}
180-
if(!empty) {
178+
if(!bufferEmpty) {
181179
std::lock_guard<std::mutex> lock(finalizeMtx);
182180
finalize(std::move(buffer));
183181
}
184182
}
185183
}
186184
catch(file_access_error& e) {
187-
if(threadId == 0) {
185+
if(threadId == 1) {
188186
std::lock_guard<std::mutex> lock(exceptionMtx);
189-
showStatus(std::string("FAIL: ") + e.what(), -1);
187+
log(std::string("FAIL: ") + e.what());
190188
}
191189
}
192190
catch(std::exception& e) {
193191
std::lock_guard<std::mutex> lock(exceptionMtx);
194-
showStatus(std::string("FAIL: ") + e.what(), -1);
192+
log(std::string("FAIL: ") + e.what());
195193
}
196194
}); //emplace
197195
}
@@ -220,14 +218,14 @@ void query_batched(
220218
*****************************************************************************/
221219
template<
222220
class BufferSource, class BufferUpdate, class BufferSink,
223-
class StatusCallback
221+
class InfoCallback, class ProgressHandler, class LogHandler
224222
>
225223
void query_database(
226224
const std::vector<std::string>& infilenames,
227225
const database& db,
228226
const query_processing_options& opt,
229227
BufferSource&& bufsrc, BufferUpdate&& bufupdate, BufferSink&& bufsink,
230-
StatusCallback&& showStatus)
228+
InfoCallback&& showInfo, ProgressHandler&& showProgress, LogHandler&& log)
231229
{
232230
// parallel_queue queue(opt.numThreads);
233231
std::vector<std::thread> threads(opt.numThreads);
@@ -243,22 +241,56 @@ void query_database(
243241
const auto& fname2 = (opt.pairing == pairing_mode::none)
244242
? nofile : infilenames[i+stride];
245243

246-
const auto progress = infilenames.size() > 1 ? i / float(infilenames.size()) : -1;
247244
if(opt.pairing == pairing_mode::files) {
248-
showStatus(fname1 + " + " + fname2, progress);
245+
showInfo(fname1 + " + " + fname2);
249246
} else {
250-
showStatus(fname1, progress);
247+
showInfo(fname1);
251248
}
249+
showProgress(infilenames.size() > 1 ? i/float(infilenames.size()) : -1);
252250

253251
query_batched(fname1, fname2, db, opt, readIdOffset,
254252
std::forward<BufferSource>(bufsrc),
255253
std::forward<BufferUpdate>(bufupdate),
256254
std::forward<BufferSink>(bufsink),
257-
std::forward<StatusCallback>(showStatus));
255+
std::forward<LogHandler>(log));
258256
}
259257
}
260258

261259

260+
261+
/*************************************************************************//**
262+
*
263+
* @brief queries database
264+
*
265+
* @tparam BufferSource returns a per-batch buffer object
266+
*
267+
* @tparam BufferUpdate takes database matches of one query and a buffer;
268+
* must be thread-safe (only const operations on DB!)
269+
*
270+
* @tparam BufferSink recieves buffer after batch is finished
271+
*
272+
*****************************************************************************/
273+
template<
274+
class BufferSource, class BufferUpdate, class BufferSink, class InfoCallback
275+
>
276+
void query_database(
277+
const std::vector<std::string>& infilenames,
278+
const database& db,
279+
const query_processing_options& opt,
280+
BufferSource&& bufsrc, BufferUpdate&& bufupdate, BufferSink&& bufsink,
281+
InfoCallback&& showInfo)
282+
{
283+
query_database(infilenames, db, opt,
284+
std::forward<BufferSource>(bufsrc),
285+
std::forward<BufferUpdate>(bufupdate),
286+
std::forward<BufferSink>(bufsink),
287+
std::forward<InfoCallback>(showInfo),
288+
[] (float p) { show_progress_indicator(std::cerr, p); },
289+
[] (const std::string& s) {std::cerr << s << '\n'; }
290+
);
291+
}
292+
293+
262294
} // namespace mc
263295

264296

src/sequence_io.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ fasta_reader::fasta_reader(const string& filename):
101101
if(!filename.empty()) {
102102
file_.open(filename);
103103

104-
if(file_.rdstate() & std::ifstream::failbit) {
104+
if(!file_.good()) {
105105
invalidate();
106106
throw file_access_error{"can't open file " + filename};
107107
}
@@ -226,7 +226,7 @@ fastq_reader::fastq_reader(const string& filename):
226226
if(!filename.empty()) {
227227
file_.open(filename);
228228

229-
if(file_.rdstate() & std::ifstream::failbit) {
229+
if(!file_.good()) {
230230
invalidate();
231231
throw file_access_error{"can't open file " + filename};
232232
}
@@ -326,7 +326,7 @@ sequence_header_reader::sequence_header_reader(const string& filename):
326326
{
327327
file_.open(filename);
328328

329-
if(file_.rdstate() & std::ifstream::failbit) {
329+
if(!file_.good()) {
330330
invalidate();
331331
throw file_access_error{"can't open file " + filename};
332332
}
@@ -527,13 +527,13 @@ make_sequence_reader(const string& filename)
527527
filename.find(".fnq") == (n-4) ||
528528
filename.find(".fastq") == (n-6) )
529529
{
530-
return std::unique_ptr<sequence_reader>{new fastq_reader{filename}};
530+
return std::make_unique<fastq_reader>(filename);
531531
}
532532
else if(filename.find(".fa") == (n-3) ||
533533
filename.find(".fna") == (n-4) ||
534534
filename.find(".fasta") == (n-6) )
535535
{
536-
return std::unique_ptr<sequence_reader>{new fasta_reader{filename}};
536+
return std::make_unique<fasta_reader>(filename);
537537
}
538538

539539
//try to determine file type content
@@ -543,10 +543,10 @@ make_sequence_reader(const string& filename)
543543
getline(is,line);
544544
if(!line.empty()) {
545545
if(line[0] == '>') {
546-
return std::unique_ptr<sequence_reader>{new fasta_reader{filename}};
546+
return std::make_unique<fasta_reader>(filename);
547547
}
548548
else if(line[0] == '@') {
549-
return std::unique_ptr<sequence_reader>{new fastq_reader{filename}};
549+
return std::make_unique<fastq_reader>(filename);
550550
}
551551
}
552552
throw file_read_error{"file format not recognized"};

0 commit comments

Comments
 (0)