Skip to content

Commit a199415

Browse files
authored
[Feat] support using external ray pg with bundles (#3850)
1 parent 06d3693 commit a199415

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

lmdeploy/pytorch/engine/executor/ray_executor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,12 +524,19 @@ def get_priority(ip):
524524
sorted_workers = [item[0] for item in sorted_worker_ip_map]
525525
return sorted_workers
526526

527+
def _valid_bundle_id(self, bundle_id: int):
528+
"""Check if a bundle is valid only when self.use_external_ray=True."""
529+
if (not self.ray_ctx.owned_pg and _envs.ray_external_pg_bundles
530+
and bundle_id not in _envs.ray_external_pg_bundles):
531+
return False
532+
return True
533+
527534
def _init_workers_ray(self, placement_group: PlacementGroup, worker_kwargs: dict):
528535
"""Init worker ray."""
529536
device_str = get_device_str()
530537
bundle_indices = []
531538
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
532-
if bundle.get(device_str, 0):
539+
if bundle.get(device_str, 0) and self._valid_bundle_id(bundle_id):
533540
bundle_indices.append(bundle_id)
534541
bundle_indices = bundle_indices[:self.world_size]
535542

@@ -556,7 +563,7 @@ def _init_workers_ray(self, placement_group: PlacementGroup, worker_kwargs: dict
556563
worker = ray.remote(
557564
num_cpus=0,
558565
num_gpus=0,
559-
resources={device_str: 1.0},
566+
resources={device_str: 0.01},
560567
scheduling_strategy=scheduling_strategy,
561568
)(RayWorkerWrapper).remote(**worker_kwargs)
562569
workers.append(worker)

lmdeploy/pytorch/envs.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,22 @@ def env_to_int(
4040
return value
4141

4242

43+
def env_to_list_int(
44+
env_var: str,
45+
default: list[int] = None,
46+
):
47+
"""Env to list of int."""
48+
default_ = default if default is not None else []
49+
value = os.getenv(env_var)
50+
if value is None:
51+
return default_
52+
try:
53+
value = [int(x) for x in value.split(',')]
54+
except Exception:
55+
value = default_
56+
return value
57+
58+
4359
_ENVS = dict()
4460

4561

@@ -91,6 +107,10 @@ def _patched_get_env(
91107
ray_timeline_enable = env_to_bool('LMDEPLOY_RAY_TIMELINE_ENABLE', False)
92108
ray_timeline_output_path = os.getenv('LMDEPLOY_RAY_TIMELINE_OUT_PATH', 'ray_timeline.json')
93109

110+
# ray external placement group bundles
111+
# only used when lmdeploy is initialized inside a Ray Actor with pg allocated
112+
ray_external_pg_bundles = env_to_list_int('LMDEPLOY_RAY_EXTERNAL_PG_BUNDLES', [])
113+
94114
# dist
95115
dist_master_addr = os.getenv('LMDEPLOY_DIST_MASTER_ADDR', None)
96116
dist_master_port = os.getenv('LMDEPLOY_DIST_MASTER_PORT', None)

0 commit comments

Comments
 (0)