Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2958,7 +2958,11 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
doc = "Option to override the auto-detected network interfaces max speed"
)
private Optional<Double> loadBalancerOverrideBrokerNicSpeedGbps = Optional.empty();

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Option to override the auto-detected network interfaces"
)
private List<String> loadBalancerOverrideBrokerNics = new ArrayList<>();
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,27 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
private final Optional<Double> overrideBrokerNicSpeedGbps;
private final List<String> overrideBrokerNics;
private final boolean isCGroupsEnabled;

public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
this(
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(),
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNics(),
pulsar.getLoadManagerExecutor()
);
}

public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
Optional<Double> overrideBrokerNicSpeedGbps,
List<String> overrideBrokerNics,
ScheduledExecutorService executorService) {
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
this.overrideBrokerNics = overrideBrokerNics;
this.isCGroupsEnabled = isCGroupEnabled();
// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
Expand All @@ -88,7 +92,7 @@ public SystemResourceUsage getBrokerHostUsage() {

@Override
public void calculateBrokerHostUsage() {
List<String> nics = getUsablePhysicalNICs();
List<String> nics = !overrideBrokerNics.isEmpty() ? overrideBrokerNics : getUsablePhysicalNICs();
double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, BitRateUnit.Kilobit);
double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, BitRateUnit.Kilobit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.tools;

import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -67,7 +68,7 @@ public Integer call() throws Exception {
try {
if (isLinux) {
hostUsage = new LinuxBrokerHostUsageImpl(
Integer.MAX_VALUE, Optional.empty(), scheduler
Integer.MAX_VALUE, Optional.empty(), new ArrayList<>(), scheduler
);
} else {
hostUsage = new GenericBrokerHostUsageImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -26,7 +29,11 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -38,7 +45,7 @@ public void checkOverrideBrokerNicSpeedGbps() {
@Cleanup("shutdown")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), executorService);
new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), new ArrayList<>(), executorService);
List<String> nics = new ArrayList<>();
nics.add("1");
nics.add("2");
Expand All @@ -47,6 +54,34 @@ public void checkOverrideBrokerNicSpeedGbps() {
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}

@Test
public void checkOverrideBrokerNics() {
try (MockedStatic<LinuxInfoUtils> mockedUtils = Mockito.mockStatic(LinuxInfoUtils.class)) {
mockedUtils.when(() -> LinuxInfoUtils.getTotalNicUsage(any(), any(), any())).thenReturn(3.0d);
mockedUtils.when(LinuxInfoUtils::getCpuUsageForEntireHost).thenReturn(LinuxInfoUtils.ResourceUsage.empty());
List<String> nics = new ArrayList<>();
nics.add("1");
nics.add("2");
nics.add("3");
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(3.0d));
config.setLoadBalancerOverrideBrokerNics(nics);
PulsarService pulsarService = mock(PulsarService.class);
when(pulsarService.getConfiguration()).thenReturn(config);
@Cleanup("shutdown")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
when(pulsarService.getLoadManagerExecutor()).thenReturn(executorService);
LinuxBrokerHostUsageImpl linuxBrokerHostUsage = new LinuxBrokerHostUsageImpl(pulsarService);
linuxBrokerHostUsage.calculateBrokerHostUsage();
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
double totalNicLimitRx = linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthIn().limit;
double totalNicLimitTx = linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthOut().limit;
Assert.assertEquals(totalNicLimitRx, 3.0 * 1000 * 1000 * 3);
Assert.assertEquals(totalNicLimitTx, 3.0 * 1000 * 1000 * 3);
}
}

@Test
public void testCpuUsage() throws InterruptedException {
if (!LinuxInfoUtils.isLinux()) {
Expand All @@ -56,7 +91,8 @@ public void testCpuUsage() throws InterruptedException {
@Cleanup("shutdown")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(),
new ArrayList<>(), executorService);

linuxBrokerHostUsage.calculateBrokerHostUsage();
TimeUnit.SECONDS.sleep(1);
Expand Down
Loading