Skip to content

Commit 81a68c8

Browse files
parallel write code
Signed-off-by: AdityaPandeyCN <[email protected]>
1 parent 3948c89 commit 81a68c8

File tree

3 files changed

+267
-59
lines changed

3 files changed

+267
-59
lines changed

benchmark/chromosome_split_benchmark.cxx

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,52 @@ static void BM_ChromosomeSplit(benchmark::State &state)
112112
state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate);
113113
}
114114

115+
static void BM_ChromosomeSplitParallel(benchmark::State &state)
116+
{
117+
int num_reads = state.range(0);
118+
int num_threads = state.range(1);
119+
std::string sam_file = "bench_split_par_" + std::to_string(num_reads) + ".sam";
120+
121+
GenerateSAMFile(sam_file, num_reads);
122+
123+
FILE *old_stdout = stdout;
124+
FILE *old_stderr = stderr;
125+
126+
for (auto _ : state) {
127+
stdout = fopen("/dev/null", "w");
128+
stderr = fopen("/dev/null", "w");
129+
130+
samtoramntuple_split_by_chromosome_parallel(sam_file.c_str(), "bench_split_par_out", 505, 1, num_threads);
131+
132+
fclose(stdout);
133+
fclose(stderr);
134+
stdout = old_stdout;
135+
stderr = old_stderr;
136+
137+
state.counters["size_MB"] = GetTotalFileSize("bench_split_par_out_") / (1024.0 * 1024.0);
138+
state.counters["threads"] = num_threads;
139+
CleanupFiles("bench_split_par_out_");
140+
}
141+
142+
std::remove(sam_file.c_str());
143+
state.counters["reads/s"] = benchmark::Counter(num_reads, benchmark::Counter::kIsRate);
144+
}
145+
115146
BENCHMARK(BM_SamtoolsSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond);
116147
BENCHMARK(BM_ChromosomeSplit)->Arg(100000)->Arg(500000)->Arg(1000000)->Unit(benchmark::kMillisecond);
117148

149+
// Parallel benchmarks with different thread counts
150+
BENCHMARK(BM_ChromosomeSplitParallel)
151+
->Args({100000, 2})
152+
->Args({100000, 4})
153+
->Args({100000, 8})
154+
->Args({500000, 2})
155+
->Args({500000, 4})
156+
->Args({500000, 8})
157+
->Args({1000000, 2})
158+
->Args({1000000, 4})
159+
->Args({1000000, 8})
160+
->Unit(benchmark::kMillisecond);
161+
118162
BENCHMARK_MAIN();
163+

inc/ramcore/SamToNTuple.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,11 @@ void samtoramntuple(const char *datafile,
1313
void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm,
1414
uint32_t quality_policy);
1515

16+
void samtoramntuple_split_by_chromosome_parallel(const char *datafile,
17+
const char *output_prefix,
18+
int compression_algorithm,
19+
uint32_t quality_policy,
20+
int num_threads = 4);
21+
1622
#endif
23+

src/ramcore/SamToNTuple.cxx

Lines changed: 215 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
#include <TList.h>
1010
#include <TNamed.h>
1111
#include <TFile.h>
12+
#include <TROOT.h>
1213

1314
#include <map>
1415
#include <memory>
1516
#include <iostream>
1617
#include <fstream>
1718
#include <cstdio>
19+
#include <thread>
20+
#include <vector>
21+
#include <mutex>
1822

1923
void samtoramntuple(const char *datafile,
2024
const char *treefile,
@@ -126,66 +130,218 @@ void samtoramntuple(const char *datafile,
126130
stopwatch.Print();
127131
}
128132

129-
void samtoramntuple_split_by_chromosome(const char *datafile, const char *output_prefix, int compression_algorithm,
133+
void samtoramntuple_split_by_chromosome(const char *datafile,
134+
const char *output_prefix,
135+
int compression_algorithm,
130136
uint32_t quality_policy)
131137
{
132-
std::ifstream input(datafile);
133-
if (!input) {
134-
std::cerr << "Error: Cannot open " << datafile << std::endl;
135-
return;
136-
}
137-
138-
std::vector<std::string> headers;
139-
std::map<std::string, std::unique_ptr<std::ofstream>> chr_files;
140-
std::map<std::string, std::string> chr_temp_filenames;
141-
std::string line;
142-
143-
while (std::getline(input, line)) {
144-
if (line.empty())
145-
continue;
146-
147-
if (line[0] == '@') {
148-
headers.push_back(line);
149-
continue;
150-
}
151-
152-
size_t pos = line.find('\t');
153-
if (pos == std::string::npos)
154-
continue;
155-
pos = line.find('\t', pos + 1);
156-
if (pos == std::string::npos)
157-
continue;
158-
159-
size_t end_pos = line.find('\t', pos + 1);
160-
if (end_pos == std::string::npos)
161-
continue;
162-
163-
std::string rname = line.substr(pos + 1, end_pos - pos - 1);
164-
if (rname == "*")
165-
continue;
166-
167-
if (chr_files.find(rname) == chr_files.end()) {
168-
std::string temp_filename = std::string(output_prefix) + "_" + rname + ".tmp.sam";
169-
chr_temp_filenames[rname] = temp_filename;
170-
chr_files[rname] = std::make_unique<std::ofstream>(temp_filename);
171-
172-
for (const auto &header : headers) {
173-
*(chr_files[rname]) << header << "\n";
174-
}
175-
}
176-
177-
*(chr_files[rname]) << line << "\n";
178-
}
179-
180-
input.close();
181-
for (auto &[chr, file] : chr_files) {
182-
file->close();
183-
}
138+
RAMNTupleRecord::InitializeRefs();
139+
140+
std::map<std::string, std::vector<ramcore::SamRecord>> chromosome_records;
141+
std::vector<std::pair<std::string, std::string>> headers;
142+
143+
ramcore::SamParser parser;
144+
145+
auto header_callback = [&](const std::string& tag, const std::string& content) {
146+
headers.push_back({tag, content});
147+
148+
if (tag == "@SQ") {
149+
size_t sn_pos = content.find("SN:");
150+
if (sn_pos != std::string::npos) {
151+
sn_pos += 3;
152+
size_t tab_pos = content.find('\t', sn_pos);
153+
std::string ref_name = content.substr(sn_pos,
154+
tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos);
155+
RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str());
156+
}
157+
}
158+
};
159+
160+
auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) {
161+
if (sam_record.rname != "*") {
162+
chromosome_records[sam_record.rname].push_back(sam_record);
163+
}
164+
};
165+
166+
parser.ParseFile(datafile, header_callback, record_callback);
167+
168+
for (auto& [chr, records] : chromosome_records) {
169+
std::sort(records.begin(), records.end(),
170+
[](const ramcore::SamRecord& a, const ramcore::SamRecord& b) {
171+
return a.pos < b.pos;
172+
});
173+
}
174+
175+
for (const auto& [chr, records] : chromosome_records) {
176+
std::string filename = std::string(output_prefix) + "_" + chr + ".root";
177+
auto file = TFile::Open(filename.c_str(), "RECREATE");
178+
179+
auto model = RAMNTupleRecord::MakeModel();
180+
ROOT::Experimental::RNTupleWriteOptions writeOptions;
181+
writeOptions.SetCompression(compression_algorithm);
182+
writeOptions.SetMaxUnzippedPageSize(128000);
183+
184+
auto writer = ROOT::Experimental::RNTupleWriter::Append(
185+
std::move(model), "RAM", *file, writeOptions);
186+
auto entry = writer->CreateEntry();
187+
auto recordPtr = entry->GetPtr<RAMNTupleRecord>("record").get();
188+
189+
for (const auto& sam_record : records) {
190+
recordPtr->SetBit(quality_policy);
191+
recordPtr->SetQNAME(sam_record.qname.c_str());
192+
recordPtr->SetFLAG(sam_record.flag);
193+
recordPtr->SetREFID(sam_record.rname.c_str());
194+
recordPtr->SetPOS(sam_record.pos);
195+
recordPtr->SetMAPQ(sam_record.mapq);
196+
recordPtr->SetCIGAR(sam_record.cigar.c_str());
197+
recordPtr->SetREFNEXT(sam_record.rnext.c_str());
198+
recordPtr->SetPNEXT(sam_record.pnext);
199+
recordPtr->SetTLEN(sam_record.tlen);
200+
recordPtr->SetSEQ(sam_record.seq.c_str());
201+
recordPtr->SetQUAL(sam_record.qual.c_str());
202+
203+
recordPtr->ResetNOPT();
204+
for (const auto& opt : sam_record.optional_fields) {
205+
recordPtr->SetOPT(opt.c_str());
206+
}
207+
208+
writer->Fill(*entry);
209+
}
210+
211+
writer.reset();
212+
213+
TList h;
214+
h.SetName("headers");
215+
for (const auto& [tag, content] : headers) {
216+
h.Add(new TNamed(tag.c_str(), content.c_str()));
217+
}
218+
219+
RAMNTupleRecord::WriteAllRefs(*file);
220+
h.Write();
221+
file->Close();
222+
delete file;
223+
}
224+
}
184225

185-
for (const auto &[chr, temp_filename] : chr_temp_filenames) {
186-
std::string output_filename = std::string(output_prefix) + "_" + chr + ".root";
187-
samtoramntuple(temp_filename.c_str(), output_filename.c_str(), false, false, false, compression_algorithm,
188-
quality_policy);
189-
std::remove(temp_filename.c_str());
190-
}
226+
void samtoramntuple_split_by_chromosome_parallel(const char *datafile,
227+
const char *output_prefix,
228+
int compression_algorithm,
229+
uint32_t quality_policy,
230+
int num_threads)
231+
{
232+
ROOT::EnableThreadSafety();
233+
RAMNTupleRecord::InitializeRefs();
234+
235+
std::map<std::string, std::vector<ramcore::SamRecord>> chromosome_records;
236+
std::vector<std::pair<std::string, std::string>> headers;
237+
238+
ramcore::SamParser parser;
239+
240+
auto header_callback = [&](const std::string& tag, const std::string& content) {
241+
headers.push_back({tag, content});
242+
243+
if (tag == "@SQ") {
244+
size_t sn_pos = content.find("SN:");
245+
if (sn_pos != std::string::npos) {
246+
sn_pos += 3;
247+
size_t tab_pos = content.find('\t', sn_pos);
248+
std::string ref_name = content.substr(sn_pos,
249+
tab_pos != std::string::npos ? tab_pos - sn_pos : std::string::npos);
250+
RAMNTupleRecord::GetRnameRefs()->GetRefId(ref_name.c_str());
251+
}
252+
}
253+
};
254+
255+
auto record_callback = [&](const ramcore::SamRecord& sam_record, size_t record_num) {
256+
if (sam_record.rname != "*") {
257+
chromosome_records[sam_record.rname].push_back(sam_record);
258+
}
259+
};
260+
261+
parser.ParseFile(datafile, header_callback, record_callback);
262+
263+
for (auto& [chr, records] : chromosome_records) {
264+
std::sort(records.begin(), records.end(),
265+
[](const ramcore::SamRecord& a, const ramcore::SamRecord& b) {
266+
return a.pos < b.pos;
267+
});
268+
}
269+
270+
std::vector<std::string> chr_names;
271+
for (const auto& [chr, records] : chromosome_records) {
272+
chr_names.push_back(chr);
273+
}
274+
275+
static std::mutex file_destruction_mutex;
276+
277+
auto write_chromosome = [&](const std::string& chr) {
278+
const auto& records = chromosome_records[chr];
279+
280+
std::string filename = std::string(output_prefix) + "_" + chr + ".root";
281+
282+
std::unique_ptr<TFile> file(TFile::Open(filename.c_str(), "RECREATE"));
283+
284+
auto model = RAMNTupleRecord::MakeModel();
285+
ROOT::Experimental::RNTupleWriteOptions writeOptions;
286+
writeOptions.SetCompression(compression_algorithm);
287+
writeOptions.SetMaxUnzippedPageSize(128000);
288+
289+
auto writer = ROOT::Experimental::RNTupleWriter::Append(
290+
std::move(model), "RAM", *file, writeOptions);
291+
auto entry = writer->CreateEntry();
292+
auto recordPtr = entry->GetPtr<RAMNTupleRecord>("record").get();
293+
294+
for (const auto& sam_record : records) {
295+
recordPtr->SetBit(quality_policy);
296+
recordPtr->SetQNAME(sam_record.qname.c_str());
297+
recordPtr->SetFLAG(sam_record.flag);
298+
recordPtr->SetREFID(sam_record.rname.c_str());
299+
recordPtr->SetPOS(sam_record.pos);
300+
recordPtr->SetMAPQ(sam_record.mapq);
301+
recordPtr->SetCIGAR(sam_record.cigar.c_str());
302+
recordPtr->SetREFNEXT(sam_record.rnext.c_str());
303+
recordPtr->SetPNEXT(sam_record.pnext);
304+
recordPtr->SetTLEN(sam_record.tlen);
305+
recordPtr->SetSEQ(sam_record.seq.c_str());
306+
recordPtr->SetQUAL(sam_record.qual.c_str());
307+
308+
recordPtr->ResetNOPT();
309+
for (const auto& opt : sam_record.optional_fields) {
310+
recordPtr->SetOPT(opt.c_str());
311+
}
312+
313+
writer->Fill(*entry);
314+
}
315+
316+
writer.reset();
317+
318+
TList h;
319+
h.SetName("headers");
320+
for (const auto& [tag, content] : headers) {
321+
h.Add(new TNamed(tag.c_str(), content.c_str()));
322+
}
323+
324+
RAMNTupleRecord::WriteAllRefs(*file);
325+
h.Write();
326+
327+
{
328+
std::lock_guard<std::mutex> lock(file_destruction_mutex);
329+
file->Close();
330+
file.reset();
331+
}
332+
};
333+
334+
size_t chr_idx = 0;
335+
while (chr_idx < chr_names.size()) {
336+
std::vector<std::thread> threads;
337+
338+
for (int i = 0; i < num_threads && chr_idx < chr_names.size(); ++i, ++chr_idx) {
339+
threads.emplace_back(write_chromosome, chr_names[chr_idx]);
340+
}
341+
342+
for (auto& t : threads) {
343+
t.join();
344+
}
345+
}
191346
}
347+

0 commit comments

Comments
 (0)