Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
* Added support for OpAMP WebSocket transport.
([#3064](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/3064),
[#3092](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/3092))
* Added support for heartbeat messages.
([#3095](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/3095))
51 changes: 51 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Internal/FrameBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

using Google.Protobuf;
using OpAmp.Proto.V1;
using OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;
using OpenTelemetry.OpAmp.Client.Internal.Settings;

namespace OpenTelemetry.OpAmp.Client.Internal;

Expand Down Expand Up @@ -37,6 +39,55 @@ public IFrameBuilder StartBaseMessage()
return this;
}

IFrameBuilder IFrameBuilder.AddHeartbeat(HealthReport health)
{
if (this.currentMessage == null)
{
throw new InvalidOperationException("Message base is not initialized.");
}

this.currentMessage.Health = new ComponentHealth()
{
Healthy = health.IsHealthy,
StartTimeUnixNano = health.StartTime,
StatusTimeUnixNano = health.StatusTime,
};

if (health.Status != null)
{
this.currentMessage.Health.Status = health.Status;
}

if (health.LastError != null)
{
this.currentMessage.Health.LastError = health.LastError;
}

foreach (var item in health.Components)
{
var component = new ComponentHealth()
{
Healthy = item.IsHealthy,
StartTimeUnixNano = (ulong)item.StartTime.ToUnixTimeMilliseconds() * 1_000_000, // Convert to nanoseconds
StatusTimeUnixNano = (ulong)item.StatusTime.ToUnixTimeMilliseconds() * 1_000_000, // Convert to nanoseconds
};

if (health.Status != null)
{
component.Status = health.Status;
}

if (health.LastError != null)
{
component.LastError = health.LastError;
}

this.currentMessage.Health.ComponentHealthMap[item.ComponentName] = component;
}

return this;
}

AgentToServer IFrameBuilder.Build()
{
if (this.currentMessage == null)
Expand Down
37 changes: 35 additions & 2 deletions src/OpenTelemetry.OpAmp.Client/Internal/FrameDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;
using OpenTelemetry.OpAmp.Client.Internal.Settings;
using OpenTelemetry.OpAmp.Client.Internal.Transport;

namespace OpenTelemetry.OpAmp.Client.Internal;
Expand Down Expand Up @@ -35,15 +37,15 @@ await this.syncRoot.WaitAsync(token)
.Build();

// TODO: change to proper logging
Console.WriteLine("Sending identification message.");
Console.WriteLine("[debug] Sending identification message.");

await this.transport.SendAsync(message, token)
.ConfigureAwait(false);
}
catch (Exception ex)
{
// TODO: change to proper logging
Console.WriteLine($"[Error]: {ex.Message}");
Console.WriteLine($"[error]: {ex.Message}");

this.frameBuilder.Reset(); // Reset the builder in case of failure
}
Expand All @@ -57,4 +59,35 @@ public void Dispose()
{
this.syncRoot.Dispose();
}

public async Task DispatchHeartbeatAsync(HealthReport report, CancellationToken token)
{
await this.syncRoot.WaitAsync(token)
.ConfigureAwait(false);

try
{
var message = this.frameBuilder
.StartBaseMessage()
.AddHeartbeat(report)
.Build();

// TODO: change to proper logging
Console.WriteLine("[debug] Sending hearthbeat message");

await this.transport.SendAsync(message, token)
.ConfigureAwait(false);
}
catch (Exception)
{
// TODO: change to proper logging
Console.WriteLine("[error] hearthbeat message failure");

this.frameBuilder.Reset(); // Reset the builder in case of failure
}
finally
{
this.syncRoot.Release();
}
}
}
3 changes: 3 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Internal/IFrameBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;
using OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;

namespace OpenTelemetry.OpAmp.Client.Internal;

internal interface IFrameBuilder
{
IFrameBuilder AddHeartbeat(HealthReport health);

AgentToServer Build();
}
34 changes: 33 additions & 1 deletion src/OpenTelemetry.OpAmp.Client/Internal/OpAmpClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.OpAmp.Client.Internal.Services;
using OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;
using OpenTelemetry.OpAmp.Client.Internal.Settings;
using OpenTelemetry.OpAmp.Client.Internal.Transport;
using OpenTelemetry.OpAmp.Client.Internal.Transport.Http;
using OpenTelemetry.OpAmp.Client.Internal.Transport.WebSocket;

namespace OpenTelemetry.OpAmp.Client.Internal;

internal sealed class OpAmpClient
internal sealed class OpAmpClient : IDisposable
{
private readonly OpAmpClientSettings settings = new();
private readonly FrameProcessor processor = new();
private readonly Dictionary<string, IBackgroundService> services = [];
private readonly FrameDispatcher dispatcher;
private readonly IOpAmpTransport transport;

public OpAmpClient(Action<OpAmpClientSettings>? configure = null)
{
configure?.Invoke(this.settings);

this.transport = ConstructTransport(this.settings, this.processor);
this.dispatcher = new FrameDispatcher(this.transport, this.settings);

this.ConfigureServices();
}

public void Dispose()
{
this.dispatcher.Dispose();
}

private static IOpAmpTransport ConstructTransport(OpAmpClientSettings settings, FrameProcessor processor)
Expand All @@ -29,4 +42,23 @@ private static IOpAmpTransport ConstructTransport(OpAmpClientSettings settings,
_ => throw new NotSupportedException("Unsupported transport type"),
};
}

private void ConfigureServices()
{
this.ConfigureService<HeartbeatService>(
settings => settings.Heartbeat.IsEnabled,
() => new(this.dispatcher, this.processor));
}

private void ConfigureService<T>(Predicate<OpAmpClientSettings> isEnabledCallback, Func<T> construct)
where T : IBackgroundService
{
if (isEnabledCallback(this.settings))
{
var service = construct();
service.Configure(this.settings);

this.services[service.ServiceName] = service;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;

/// <summary>
/// Represents the health status of a system component.
/// </summary>
internal sealed class ComponentHealthStatus
{
public ComponentHealthStatus(string componentName)
{
this.ComponentName = componentName;
}

/// <summary>
/// Gets or sets the name of the component.
/// </summary>
public string ComponentName { get; set; }

/// <summary>
/// Gets or sets the current status of the operation or entity.
/// </summary>
public string? Status { get; set; }

/// <summary>
/// Gets or sets the description of the most recent error encountered.
/// </summary>
public string? LastError { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the system is in a healthy state.
/// </summary>
public bool IsHealthy { get; set; }

/// <summary>
/// Gets or sets the start time of the event or operation.
/// </summary>
public DateTimeOffset StartTime { get; set; }

/// <summary>
/// Gets or sets the timestamp indicating the current status update time.
/// </summary>
public DateTimeOffset StatusTime { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;

internal sealed class HealthReport
{
public ulong StartTime { get; set; }

public ulong StatusTime { get; set; }

public bool IsHealthy { get; set; }

public string? Status { get; set; }

public string? LastError { get; set; }

public IList<ComponentHealthStatus> Components { get; } = [];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.OpAmp.Client.Internal.Listeners;
using OpenTelemetry.OpAmp.Client.Internal.Listeners.Messages;
using OpenTelemetry.OpAmp.Client.Internal.Settings;

namespace OpenTelemetry.OpAmp.Client.Internal.Services.Heartbeat;

internal sealed class HeartbeatService : IBackgroundService, IOpAmpListener<ConnectionSettingsMessage>, IDisposable
{
public const string Name = "heartbeat-service";

private readonly FrameDispatcher dispatcher;
private readonly FrameProcessor processor;
private readonly CancellationTokenSource cts;
private readonly Lock timerUpdateLock = new();
private Timer? timer;
private TimeSpan tickInterval;
private ulong startTime;

public HeartbeatService(FrameDispatcher dispatcher, FrameProcessor processor)
{
this.dispatcher = dispatcher;
this.processor = processor;
this.cts = new CancellationTokenSource();

this.processor.Subscribe(this);
}

public string ServiceName => Name;

public void Configure(OpAmpClientSettings settings)
{
this.tickInterval = settings.Heartbeat.Interval;
}

public void Start()
{
this.startTime = GetCurrentTimeInNanoseconds();
this.CreateOrUpdateTimer(this.tickInterval);
}

public void Stop()
{
this.cts.Cancel();
this.CreateOrUpdateTimer(Timeout.InfiniteTimeSpan);
}

public void HandleMessage(ConnectionSettingsMessage message)
{
var newInterval = message.ConnectionSettings.Opamp?.HeartbeatIntervalSeconds ?? 0;
if (newInterval > 0)
{
// TODO: change to proper logging
Console.WriteLine($"[debug] New heartbeat interval received: {newInterval}s");

this.CreateOrUpdateTimer(TimeSpan.FromSeconds(newInterval));
}
}

public void Dispose()
{
this.processor.Unsubscribe(this);

this.cts.Dispose();
this.timer?.Dispose();
}

private static ulong GetCurrentTimeInNanoseconds()
{
return (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000; // Convert to nanoseconds
}

private void CreateOrUpdateTimer(TimeSpan interval)
{
lock (this.timerUpdateLock)
{
this.timer ??= new Timer(this.HeartbeatTick);
this.timer.Change(interval, interval);
}
}

private async void HeartbeatTick(object? state)
{
try
{
var report = this.CreateHealthReport();

await this.dispatcher.DispatchHeartbeatAsync(report, this.cts.Token)
.ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Ignore task cancellation
}
catch (Exception ex)
{
// TODO: change to proper logging
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally left for follow ups?

For now, all contrib packages are using EventSources similar to https://github.com/open-telemetry/opentelemetry-dotnet-contrib/blob/8452d88e83e1c204bd4b43c7c3d8189f94ff87be/src/OpenTelemetry.Resources.Host/HostResourceEventSource.cs

(There are even tests for it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was my plan as well. Will address logging in a separate PR.

Console.WriteLine($"[error] Heartbeat error: {ex.Message}");
}
}

private HealthReport CreateHealthReport()
{
return new HealthReport
{
StartTime = this.startTime,
StatusTime = GetCurrentTimeInNanoseconds(),
IsHealthy = true,
Status = "OK",
};
}
}
Loading
Loading