Skip to content

Commit a272c5b

Browse files
dongzhonghuadongzhonghua03
andauthored
[improve] [broker] Add overrideBrokerNics for adaptation of heterogeneous network environments (#24883)
Co-authored-by: dongzhonghua03 <[email protected]>
1 parent a8b41b9 commit a272c5b

File tree

4 files changed

+50
-5
lines changed

4 files changed

+50
-5
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2994,7 +2994,11 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
29942994
doc = "Option to override the auto-detected network interfaces max speed"
29952995
)
29962996
private Optional<Double> loadBalancerOverrideBrokerNicSpeedGbps = Optional.empty();
2997-
2997+
@FieldContext(
2998+
category = CATEGORY_LOAD_BALANCER,
2999+
doc = "Option to override the auto-detected network interfaces"
3000+
)
3001+
private List<String> loadBalancerOverrideBrokerNics = new ArrayList<>();
29983002
@FieldContext(
29993003
category = CATEGORY_LOAD_BALANCER,
30003004
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,27 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
5656
private OperatingSystemMXBean systemBean;
5757
private SystemResourceUsage usage;
5858
private final Optional<Double> overrideBrokerNicSpeedGbps;
59+
private final List<String> overrideBrokerNics;
5960
private final boolean isCGroupsEnabled;
6061

6162
public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
6263
this(
6364
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
6465
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(),
66+
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNics(),
6567
pulsar.getLoadManagerExecutor()
6668
);
6769
}
6870

6971
public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
7072
Optional<Double> overrideBrokerNicSpeedGbps,
73+
List<String> overrideBrokerNics,
7174
ScheduledExecutorService executorService) {
7275
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
7376
this.lastCollection = 0L;
7477
this.usage = new SystemResourceUsage();
7578
this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
79+
this.overrideBrokerNics = overrideBrokerNics;
7680
this.isCGroupsEnabled = isCGroupEnabled();
7781
// Call now to initialize values before the constructor returns
7882
calculateBrokerHostUsage();
@@ -88,7 +92,7 @@ public SystemResourceUsage getBrokerHostUsage() {
8892

8993
@Override
9094
public void calculateBrokerHostUsage() {
91-
List<String> nics = getUsablePhysicalNICs();
95+
List<String> nics = !overrideBrokerNics.isEmpty() ? overrideBrokerNics : getUsablePhysicalNICs();
9296
double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
9397
double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, BitRateUnit.Kilobit);
9498
double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, BitRateUnit.Kilobit);

pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.tools;
2020

21+
import java.util.ArrayList;
2122
import java.util.Optional;
2223
import java.util.concurrent.Callable;
2324
import java.util.concurrent.Executors;
@@ -67,7 +68,7 @@ public Integer call() throws Exception {
6768
try {
6869
if (isLinux) {
6970
hostUsage = new LinuxBrokerHostUsageImpl(
70-
Integer.MAX_VALUE, Optional.empty(), scheduler
71+
Integer.MAX_VALUE, Optional.empty(), new ArrayList<>(), scheduler
7172
);
7273
} else {
7374
hostUsage = new GenericBrokerHostUsageImpl(

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.impl;
2020

21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
2124
import java.util.ArrayList;
2225
import java.util.List;
2326
import java.util.Optional;
@@ -26,7 +29,11 @@
2629
import java.util.concurrent.TimeUnit;
2730
import lombok.Cleanup;
2831
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.pulsar.broker.PulsarService;
33+
import org.apache.pulsar.broker.ServiceConfiguration;
2934
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
35+
import org.mockito.MockedStatic;
36+
import org.mockito.Mockito;
3037
import org.testng.Assert;
3138
import org.testng.annotations.Test;
3239

@@ -38,7 +45,7 @@ public void checkOverrideBrokerNicSpeedGbps() {
3845
@Cleanup("shutdown")
3946
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
4047
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
41-
new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), executorService);
48+
new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), new ArrayList<>(), executorService);
4249
List<String> nics = new ArrayList<>();
4350
nics.add("1");
4451
nics.add("2");
@@ -47,6 +54,34 @@ public void checkOverrideBrokerNicSpeedGbps() {
4754
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
4855
}
4956

57+
@Test
58+
public void checkOverrideBrokerNics() {
59+
try (MockedStatic<LinuxInfoUtils> mockedUtils = Mockito.mockStatic(LinuxInfoUtils.class)) {
60+
mockedUtils.when(() -> LinuxInfoUtils.getTotalNicUsage(any(), any(), any())).thenReturn(3.0d);
61+
mockedUtils.when(LinuxInfoUtils::getCpuUsageForEntireHost).thenReturn(LinuxInfoUtils.ResourceUsage.empty());
62+
List<String> nics = new ArrayList<>();
63+
nics.add("1");
64+
nics.add("2");
65+
nics.add("3");
66+
ServiceConfiguration config = new ServiceConfiguration();
67+
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(3.0d));
68+
config.setLoadBalancerOverrideBrokerNics(nics);
69+
PulsarService pulsarService = mock(PulsarService.class);
70+
when(pulsarService.getConfiguration()).thenReturn(config);
71+
@Cleanup("shutdown")
72+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
73+
when(pulsarService.getLoadManagerExecutor()).thenReturn(executorService);
74+
LinuxBrokerHostUsageImpl linuxBrokerHostUsage = new LinuxBrokerHostUsageImpl(pulsarService);
75+
linuxBrokerHostUsage.calculateBrokerHostUsage();
76+
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
77+
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
78+
double totalNicLimitRx = linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthIn().limit;
79+
double totalNicLimitTx = linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthOut().limit;
80+
Assert.assertEquals(totalNicLimitRx, 3.0 * 1000 * 1000 * 3);
81+
Assert.assertEquals(totalNicLimitTx, 3.0 * 1000 * 1000 * 3);
82+
}
83+
}
84+
5085
@Test
5186
public void testCpuUsage() throws InterruptedException {
5287
if (!LinuxInfoUtils.isLinux()) {
@@ -56,7 +91,8 @@ public void testCpuUsage() throws InterruptedException {
5691
@Cleanup("shutdown")
5792
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
5893
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
59-
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);
94+
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(),
95+
new ArrayList<>(), executorService);
6096

6197
linuxBrokerHostUsage.calculateBrokerHostUsage();
6298
TimeUnit.SECONDS.sleep(1);

0 commit comments

Comments
 (0)