Skip to content

Commit 96a462e

Browse files
committed
use weak_ptr and store stream in variables
1 parent dddb2be commit 96a462e

File tree

9 files changed

+29
-15
lines changed

9 files changed

+29
-15
lines changed

include/dtlmod/Engine.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class Engine {
5252
std::string name_;
5353
Type type_ = Type::Undefined;
5454
std::shared_ptr<Transport> transport_ = nullptr;
55-
Stream* stream_;
55+
std::weak_ptr<Stream> stream_;
5656

5757
sg4::MutexPtr pub_mutex_ = sg4::Mutex::create();
5858
std::set<sg4::ActorPtr> publishers_;
@@ -109,7 +109,10 @@ class Engine {
109109

110110
public:
111111
/// \cond EXCLUDE_FROM_DOCUMENTATION
112-
explicit Engine(const std::string& name, Stream* stream, Type type) : name_(name), type_(type), stream_(stream) {}
112+
explicit Engine(const std::string& name, std::shared_ptr<Stream> stream, Type type)
113+
: name_(name), type_(type), stream_(stream)
114+
{
115+
}
113116
virtual ~Engine() = default;
114117
/// \endcond
115118

include/dtlmod/FileEngine.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class FileEngine : public Engine {
4747
void sub_close() override;
4848

4949
public:
50-
explicit FileEngine(const std::string& fullpath, Stream* stream);
50+
explicit FileEngine(const std::string& fullpath, const std::shared_ptr<Stream>& stream);
5151
};
5252
/// \endcond
5353

include/dtlmod/StagingEngine.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ class StagingEngine : public Engine {
2929
void sub_close() override;
3030

3131
public:
32-
explicit StagingEngine(const std::string& name, Stream* stream) : Engine(name, stream, Engine::Type::Staging) {}
32+
explicit StagingEngine(const std::string& name, const std::shared_ptr<Stream>& stream)
33+
: Engine(name, stream, Engine::Type::Staging)
34+
{
35+
}
3336

3437
void create_transport(const Transport::Method& transport_method) override;
3538
};

include/dtlmod/Stream.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class DTL;
2424
* objects to be injected into or retrieve from this Stream.
2525
*/
2626

27-
class Stream {
27+
class Stream : public std::enable_shared_from_this<Stream> {
2828
friend class Engine;
2929
friend class FileEngine;
3030
friend class StagingEngine;

include/dtlmod/Variable.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
namespace dtlmod {
1616

17+
class Stream;
18+
1719
/// @brief A class to translate a piece of data from an application into an object handled by the DTL and its metadata.
1820
class Variable {
1921
friend class Engine;
@@ -29,6 +31,8 @@ class Variable {
2931
int transaction_start_ = -1;
3032
unsigned int transaction_count_ = 0;
3133

34+
std::weak_ptr<const Stream> defined_in_stream_;
35+
3236
std::shared_ptr<Metadata> metadata_;
3337

3438
std::map<sg4::ActorPtr, std::pair<std::vector<size_t>, std::vector<size_t>>, std::less<>> subscriber_selections_;
@@ -65,9 +69,11 @@ class Variable {
6569

6670
public:
6771
/// \cond EXCLUDE_FROM_DOCUMENTATION
68-
Variable(const std::string& name, size_t element_size, const std::vector<size_t>& shape)
72+
Variable(const std::string& name, size_t element_size, const std::vector<size_t>& shape,
73+
std::shared_ptr<const Stream> stream)
6974
: name_(name), element_size_(element_size), shape_(shape), metadata_(std::make_shared<Metadata>(this))
7075
{
76+
defined_in_stream_ = stream;
7177
}
7278
/// \endcond
7379

src/Engine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void Engine::add_subscriber(sg4::ActorPtr actor)
8888
void Engine::export_metadata_to_file() const
8989
{
9090
std::ofstream metadata_export(metadata_file_, std::ofstream::out);
91-
for (const auto& [name, v] : stream_->get_all_variables_internal())
91+
for (const auto& [name, v] : stream_.lock()->get_all_variables_internal())
9292
v->get_metadata()->export_to_file(metadata_export);
9393
metadata_export.close();
9494
}
@@ -100,7 +100,7 @@ void Engine::set_metadata_file_name()
100100

101101
void Engine::close_stream()
102102
{
103-
stream_->close();
103+
stream_.lock()->close();
104104
}
105105
/// \endcond
106106

src/FileEngine.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ namespace dtlmod {
2424

2525
// FileEngines require to know where to (virtually) write file. This information is given by fullpath which has the
2626
// following format: NetZone:FileSystem:PathToDirectory
27-
FileEngine::FileEngine(const std::string& fullpath, Stream* stream) : Engine(fullpath, stream, Engine::Type::File)
27+
FileEngine::FileEngine(const std::string& fullpath, const std::shared_ptr<Stream>& stream)
28+
: Engine(fullpath, stream, Engine::Type::File)
2829
{
2930
XBT_DEBUG("Create a new FileEngine writing in %s", fullpath.c_str());
3031

@@ -155,7 +156,7 @@ void FileEngine::pub_close()
155156
XBT_DEBUG("Closing opened files");
156157
transport->close_pub_files();
157158
XBT_DEBUG("Engine '%s' is now closed for all publishers ", get_cname());
158-
if (stream_->does_export_metadata())
159+
if (stream_.lock()->does_export_metadata())
159160
export_metadata_to_file();
160161
}
161162
}

src/StagingEngine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void StagingEngine::pub_close()
108108
XBT_DEBUG("All publishers have called the Engine::close() function");
109109
close_stream();
110110
XBT_DEBUG("Engine '%s' is now closed for all publishers ", get_cname());
111-
if (stream_->does_export_metadata())
111+
if (stream_.lock()->does_export_metadata())
112112
export_metadata_to_file();
113113
}
114114
}

src/Stream.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
140140
dtl_->lock();
141141
if (not engine_) {
142142
if (engine_type_ == Engine::Type::Staging) {
143-
engine_ = std::make_shared<StagingEngine>(name, this);
143+
engine_ = std::make_shared<StagingEngine>(name, shared_from_this());
144144
engine_->create_transport(transport_method_);
145145
} else if (engine_type_ == Engine::Type::File) {
146146
try {
147-
engine_ = std::make_shared<FileEngine>(name, this);
147+
engine_ = std::make_shared<FileEngine>(name, shared_from_this());
148148
engine_->create_transport(transport_method_);
149149
} catch (const IncorrectPathDefinitionException& e) {
150150
got_exception = true;
@@ -222,7 +222,7 @@ std::shared_ptr<Variable> Stream::define_variable(const std::string& name, const
222222
return var->second;
223223
}
224224
} else {
225-
auto new_var = std::make_shared<Variable>(name, element_size, shape);
225+
auto new_var = std::make_shared<Variable>(name, element_size, shape, shared_from_this());
226226
new_var->set_local_start_and_count(publisher, std::make_pair(start, count));
227227
variables_.try_emplace(name, new_var);
228228
return new_var;
@@ -248,7 +248,8 @@ std::shared_ptr<Variable> Stream::inquire_variable(const std::string& name) cons
248248
if (not engine_ || engine_->is_publisher(actor))
249249
return var->second;
250250
else {
251-
auto new_var = std::make_shared<Variable>(name, var->second->get_element_size(), var->second->get_shape());
251+
auto new_var =
252+
std::make_shared<Variable>(name, var->second->get_element_size(), var->second->get_shape(), shared_from_this());
252253
new_var->set_local_start_and_count(actor, std::make_pair(std::vector<size_t>(var->second->get_shape().size(), 0),
253254
std::vector<size_t>(var->second->get_shape().size(), 0)));
254255
new_var->set_metadata(var->second->get_metadata());

0 commit comments

Comments
 (0)