Skip to content

Commit beac321

Browse files
committed
cm: alerts: implement alerts module
Signed-off-by: Mykhailo Lohvynenko <[email protected]>
1 parent 5171f28 commit beac321

File tree

8 files changed

+618
-0
lines changed

8 files changed

+618
-0
lines changed

src/core/cm/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ set(TARGET_PREFIX ${TARGET_PREFIX}_cm)
5252
# Subdirectories
5353
# ######################################################################################################################
5454

55+
add_subdirectory(alerts)
5556
add_subdirectory(communication)
5657
add_subdirectory(iamclient)
5758
add_subdirectory(fileserver)

src/core/cm/alerts/CMakeLists.txt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#
2+
# Copyright (C) 2025 EPAM Systems, Inc.
3+
#
4+
# SPDX-License-Identifier: Apache-2.0
5+
#
6+
7+
# ######################################################################################################################
8+
# Target name
9+
# ######################################################################################################################
10+
11+
set(TARGET_NAME alerts)
12+
13+
# ######################################################################################################################
14+
# Sources
15+
# ######################################################################################################################
16+
17+
set(SOURCES alerts.cpp)
18+
19+
# ######################################################################################################################
20+
# Libraries
21+
# ######################################################################################################################
22+
23+
set(LIBRARIES aos::core::common::alerts aos::core::common::connectionprovider aos::core::cm::communication)
24+
25+
# ######################################################################################################################
26+
# Target
27+
# ######################################################################################################################
28+
29+
add_module(
30+
TARGET_NAME
31+
${TARGET_NAME}
32+
LOG_MODULE
33+
STACK_USAGE
34+
${AOS_STACK_USAGE}
35+
SOURCES
36+
${SOURCES}
37+
LIBRARIES
38+
${LIBRARIES}
39+
)
40+
41+
if(WITH_TEST)
42+
add_subdirectory(tests)
43+
endif()

src/core/cm/alerts/alerts.cpp

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright (C) 2025 EPAM Systems, Inc.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include <core/common/tools/logger.hpp>
8+
9+
#include "alerts.hpp"
10+
11+
namespace aos::cm::alerts {
12+
13+
namespace {
14+
15+
/***********************************************************************************************************************
16+
* Static
17+
**********************************************************************************************************************/
18+
19+
class GetTimestamp : public StaticVisitor<Time> {
20+
public:
21+
Res Visit(const cloudprotocol::AlertItem& alert) const { return alert.mTimestamp; }
22+
};
23+
24+
class SetTimestamp : public StaticVisitor<void> {
25+
public:
26+
explicit SetTimestamp(const Time& time)
27+
: mTime(time)
28+
{
29+
}
30+
31+
template <typename T>
32+
Res Visit(T& val) const
33+
{
34+
val.mTimestamp = mTime;
35+
}
36+
37+
private:
38+
Time mTime;
39+
};
40+
41+
} // namespace
42+
43+
/***********************************************************************************************************************
44+
* Public
45+
**********************************************************************************************************************/
46+
47+
Error Alerts::Init(const Config& config, communication::CommunicationItf& communication)
48+
{
49+
LOG_DBG() << "Initialize alerts";
50+
51+
mConfig = config;
52+
mCommunication = &communication;
53+
54+
return ErrorEnum::eNone;
55+
}
56+
57+
Error Alerts::Start()
58+
{
59+
LockGuard lock {mMutex};
60+
61+
LOG_DBG() << "Start alerts module";
62+
63+
if (mIsRunning) {
64+
return ErrorEnum::eWrongState;
65+
}
66+
67+
mIsRunning = true;
68+
69+
return mSendTimer.Start(mConfig.mSendPeriod, [this](void*) {
70+
if (auto err = SendAlerts(); !err.IsNone()) {
71+
LOG_ERR() << "Failed to send alerts" << Log::Field(err);
72+
}
73+
});
74+
}
75+
76+
Error Alerts::Stop()
77+
{
78+
{
79+
LockGuard lock {mMutex};
80+
81+
LOG_DBG() << "Stop alerts module";
82+
83+
if (!mIsRunning) {
84+
return ErrorEnum::eWrongState;
85+
}
86+
87+
mIsRunning = false;
88+
}
89+
90+
return mSendTimer.Stop();
91+
}
92+
93+
Error Alerts::SendAlert(const cloudprotocol::AlertVariant& alert)
94+
{
95+
LockGuard lock {mMutex};
96+
97+
LOG_DBG() << "Send alert" << Log::Field("alert", alert);
98+
99+
if (IsDuplicated(alert)) {
100+
++mDuplicatedAlerts;
101+
102+
return ErrorEnum::eNone;
103+
}
104+
105+
if (auto err = CacheAlert(alert); !err.IsNone()) {
106+
++mSkippedAlerts;
107+
108+
return err;
109+
}
110+
111+
return ErrorEnum::eNone;
112+
}
113+
114+
void Alerts::OnConnect()
115+
{
116+
LockGuard lock {mMutex};
117+
118+
LOG_DBG() << "Publisher connected";
119+
120+
mIsConnected = true;
121+
}
122+
123+
void Alerts::OnDisconnect()
124+
{
125+
LockGuard lock {mMutex};
126+
127+
LOG_DBG() << "Publisher disconnected";
128+
129+
mIsConnected = false;
130+
}
131+
132+
/***********************************************************************************************************************
133+
* Private
134+
**********************************************************************************************************************/
135+
136+
Error Alerts::SendAlerts()
137+
{
138+
LockGuard lock {mMutex};
139+
140+
LOG_DBG() << "Send alerts";
141+
142+
if (!mIsRunning || !mIsConnected || mAlerts.IsEmpty()) {
143+
return ErrorEnum::eNone;
144+
}
145+
146+
if (mSkippedAlerts > 0) {
147+
LOG_WRN() << "Alerts skipped due to channel is full" << Log::Field("count", mSkippedAlerts);
148+
}
149+
150+
if (mDuplicatedAlerts > 0) {
151+
LOG_WRN() << "Alerts skipped due to duplication" << Log::Field("count", mDuplicatedAlerts);
152+
}
153+
154+
auto cloudMessage = MakeUnique<cloudprotocol::MessageVariant>(&mAllocator);
155+
156+
while (!mAlerts.IsEmpty()) {
157+
cloudMessage->SetValue(mAlerts.Front());
158+
159+
if (auto err = mCommunication->SendMessage(*cloudMessage); !err.IsNone()) {
160+
return AOS_ERROR_WRAP(err);
161+
}
162+
163+
mAlerts.Erase(mAlerts.begin());
164+
}
165+
166+
return ErrorEnum::eNone;
167+
}
168+
169+
bool Alerts::IsDuplicated(const cloudprotocol::AlertVariant& alert)
170+
{
171+
auto alertCopy = MakeUnique<cloudprotocol::AlertVariant>(&mAllocator, alert);
172+
173+
return mAlerts.FindIf([&alertCopy](const cloudprotocol::Alerts& alerts) {
174+
return alerts.mItems.FindIf([&alertCopy](const cloudprotocol::AlertVariant& item) {
175+
alertCopy->ApplyVisitor(SetTimestamp(alertCopy->ApplyVisitor(GetTimestamp())));
176+
177+
return *alertCopy == item;
178+
}) != alerts.mItems.end();
179+
}) != mAlerts.end();
180+
}
181+
182+
Error Alerts::CacheAlert(const cloudprotocol::AlertVariant& alert)
183+
{
184+
if (mAlerts.IsFull() && mAlerts.Back().mItems.IsFull()) {
185+
return AOS_ERROR_WRAP(ErrorEnum::eNoMemory);
186+
}
187+
188+
if (mAlerts.IsEmpty()) {
189+
if (auto err = mAlerts.EmplaceBack(); !err.IsNone()) {
190+
return AOS_ERROR_WRAP(err);
191+
}
192+
}
193+
194+
if (mAlerts.Back().mItems.IsFull()) {
195+
if (auto err = mAlerts.EmplaceBack(); !err.IsNone()) {
196+
return AOS_ERROR_WRAP(err);
197+
}
198+
}
199+
200+
if (auto err = mAlerts.Back().mItems.PushBack(alert); !err.IsNone()) {
201+
return AOS_ERROR_WRAP(err);
202+
}
203+
204+
return ErrorEnum::eNone;
205+
}
206+
207+
} // namespace aos::cm::alerts

src/core/cm/alerts/alerts.hpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (C) 2025 EPAM Systems, Inc.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#ifndef AOS_CORE_COMMON_ALERTS_ALERTS_HPP_
8+
#define AOS_CORE_COMMON_ALERTS_ALERTS_HPP_
9+
10+
#include <core/cm/communication/communication.hpp>
11+
#include <core/cm/config.hpp>
12+
#include <core/common/alerts/itf/sender.hpp>
13+
#include <core/common/connectionprovider/connectionprovider.hpp>
14+
#include <core/common/tools/memory.hpp>
15+
#include <core/common/tools/thread.hpp>
16+
#include <core/common/tools/timer.hpp>
17+
18+
#include "config.hpp"
19+
20+
namespace aos::cm::alerts {
21+
22+
/**
23+
* Alert packages count.
24+
*/
25+
constexpr auto cAlertPackagesCount = AOS_CONFIG_CM_ALERTS_ALERT_PACKAGES_COUNT;
26+
27+
/**
28+
* Alerts.
29+
*/
30+
class Alerts : public aos::alerts::SenderItf, public ConnectionSubscriberItf {
31+
public:
32+
/**
33+
* Initializes alerts.
34+
*
35+
* @param config configuration object.
36+
* @param communication communication object.
37+
* @return Error.
38+
*/
39+
Error Init(const Config& config, communication::CommunicationItf& communication);
40+
41+
/**
42+
* Starts alerts module.
43+
*
44+
* @return Error.
45+
*/
46+
Error Start();
47+
48+
/**
49+
* Stops alerts module.
50+
*
51+
* @return Error.
52+
*/
53+
Error Stop();
54+
55+
/**
56+
* Sends alert data.
57+
*
58+
* @param alert alert variant.
59+
* @return Error.
60+
*/
61+
Error SendAlert(const cloudprotocol::AlertVariant& alert) override;
62+
63+
/**
64+
* Notifies publisher is connected.
65+
*/
66+
void OnConnect() override;
67+
68+
/**
69+
* Notifies publisher is disconnected.
70+
*/
71+
void OnDisconnect() override;
72+
73+
private:
74+
static constexpr auto cAllocatorSize = sizeof(cloudprotocol::AlertVariant) + sizeof(cloudprotocol::MessageVariant);
75+
76+
Error SendAlerts();
77+
bool IsDuplicated(const cloudprotocol::AlertVariant& alert);
78+
Error CacheAlert(const cloudprotocol::AlertVariant& alert);
79+
80+
StaticAllocator<cAllocatorSize> mAllocator;
81+
Config mConfig;
82+
communication::CommunicationItf* mCommunication {};
83+
StaticArray<cloudprotocol::Alerts, cAlertPackagesCount> mAlerts;
84+
Mutex mMutex;
85+
Timer mSendTimer;
86+
bool mIsRunning {false};
87+
bool mIsConnected {false};
88+
size_t mSkippedAlerts {};
89+
size_t mDuplicatedAlerts {};
90+
};
91+
92+
} // namespace aos::cm::alerts
93+
94+
#endif

src/core/cm/alerts/config.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (C) 2025 EPAM Systems, Inc.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#ifndef AOS_CORE_CM_ALERTS_CONFIG_HPP_
8+
#define AOS_CORE_CM_ALERTS_CONFIG_HPP_
9+
10+
#include <core/common/tools/time.hpp>
11+
#include <core/common/types/types.hpp>
12+
13+
namespace aos::cm::alerts {
14+
15+
/*
16+
* Configuration.
17+
*/
18+
struct Config {
19+
Duration mSendPeriod;
20+
};
21+
22+
} // namespace aos::cm::alerts
23+
24+
#endif // AOS_CORE_CM_ALERTS_CONFIG_HPP_

0 commit comments

Comments
 (0)