Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@ comm_args:
grpc_ipconfig_path: config/grpc_ipconfig.csv
```

`grpc_ipconfig_path` specifies the path of the config for gRPC communication. Config file specifies an ip address for each process through with they can communicate with each other. The config file should have the folliwng format:
`grpc_ipconfig_path` specifies the path of the config for gRPC communication. Config file specifies an ip address for each process through with they can communicate with each other. The config file should have the following format:

```csv
receiver_id,ip
0,127.0.0.1
1,127.0.0.1
2,127.0.0.1
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
1,1,0.0.0.0,8899
2,2,0.0.0.0,8898
```

Here the `receiver_id` is the rank of the process.
Here, `eid, rank, ip, port` are the id, rank, ip address and port of the server or client process. For server processes the rank is always set to 0, while for clients is always set to 1 or above.

## One Line API Example

Example is provided at:
Examples are provided at:

`python/examples/cross_silo/grpc_fedavg_mnist_lr_example/one_line`
`python/examples/cross_silo/grpc_fedavg_mnist_lr_example/step_by_step`
`python/examples/cross_silo/grpc_fedavg_mnist_lr_example/custom_data_and_model`

### Training Script

At the client side, the client ID (a.k.a rank) starts from 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ The default ip of every groc server is set to `0.0.0.0`, and all grpc ports star
> the aggregator (rank: 0). This record is mandatory. However, you can change the values of the `ip` and `port`
> attributes as you see fit, and more records for grpc server of the rest of clients. For instance:
```
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
1,1,0.0.0.0,8899
2,2,0.0.0.0,8898
1,1,0.0.0.0,8891
2,2,0.0.0.0,8892
```

## Start Script
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

# Introduction
In this working example, we will run 1 aggregation server and 2 clients on the same machine using Docker + gRPC and we will use the FEDML.ai platform to run the FL job.

# gRPC Configuration File
The content of the gRPC configuration file is as follows:
```
eid,rank,grpc_server_ip,grpc_server_port,ingress_ip
0,0,0.0.0.0,8890,fedml_server
1,1,0.0.0.0,8899,fedml_client_1
2,2,0.0.0.0,8898,fedml_client_2
```
The ingress_ip variable refers to the name of the container that we assign to either the server or the client, as we discuss in detail below:


# Docker Configuration
Before creating any docker container one our machine, we need to pull the latest fedml image (e.g., `fedml:v090`) and ensure that all spawned containers can communicate to each other through a network bridge (e.g., `fedml_grpc_network`).
Specifically, what you need to do is:
```bash
docker pull fedml:v090
docker network create fedml_grpc_network
```

Once these two steps are configured we can start 1 aggregation server and 2 clients (without using a GPU) and register them using our <FEDML_API_KEY> with the fedml platform as follows:

```bash
# Server
docker run -it -p 8890:8890 --entrypoint /bin/bash --name fedml_server --network fedml_grpc_network fedml:dev090
redis-server --daemonize yes
source /fedml/bin/activate
fedml login -s <FEDML_API_KEY>
```

```bash
# Client 1
docker run -it -p 8891:8891 --entrypoint /bin/bash --name fedml_client_1 --network fedml_grpc_network fedml:dev090
redis-server --daemonize yes
source /fedml/bin/activate
fedml login -c <FEDML_API_KEY>
```

```bash
# Client-2
docker run -it -p 8892:8892 --entrypoint /bin/bash --name fedml_client_2 --network fedml_grpc_network fedml:dev090
redis-server --daemonize yes
source /fedml/bin/activate
fedml login -c <FEDML_API_KEY>
```

Then we only need to compile our job and submit to our dockerb-based cluster as it is also discussed in detail in the official FEDML documentation: https://fedml.ai/octopus/userGuides

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
:: ### don't modify this part ###
:: ##############################


:: ### please customize your script in this region ####
set DATA_PATH=%userprofile%\fedml_data
if exist %DATA_PATH% (echo Exist %DATA_PATH%) else mkdir %DATA_PATH%


:: ### don't modify this part ###
echo [FedML]Bootstrap Finished
:: ##############################
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

# pip install fedml==0.7.15
#pip install --upgrade fedml

### don't modify this part ###
echo "[FedML]Bootstrap Finished"
##############################
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
common_args:
training_type: "cross_silo"
scenario: "horizontal"
using_mlops: false
random_seed: 0

environment_args:
bootstrap: config/bootstrap.sh

data_args:
dataset: "mnist"
data_cache_dir: "../../../../data/mnist"
partition_method: "hetero"
partition_alpha: 0.5

model_args:
model: "lr"
model_file_cache_folder: "./model_file_cache" # will be filled by the server automatically
global_model_file_path: "./model_file_cache/global_model.pt"

train_args:
federated_optimizer: "FedAvg"
client_id_list:
client_num_in_total: 1000
client_num_per_round: 2
comm_round: 50
epochs: 1
batch_size: 10
client_optimizer: sgd
learning_rate: 0.03
weight_decay: 0.001

validation_args:
frequency_of_the_test: 5

device_args:
worker_num: 2
using_gpu: false
gpu_mapping_file: config/gpu_mapping.yaml
gpu_mapping_key: mapping_default

comm_args:
backend: "GRPC"
grpc_ipconfig_path: config/grpc_ipconfig.csv

tracking_args:
# When running on MLOps platform(open.fedml.ai), the default log path is at ~/.fedml/fedml-client/fedml/logs/ and ~/.fedml/fedml-server/fedml/logs/
local_log_output_path: ./log
enable_wandb: false
wandb_key: ee0b5f53d949c84cee7decbe7a619e63fb1f8408
wandb_project: fedml
wandb_name: fedml_torch_fedavg_mnist_lr
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
eid,rank,grpc_server_ip,grpc_server_port,ingress_ip
0,0,0.0.0.0,8890,fedml_server
1,1,0.0.0.0,8891,fedml_client_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
RANK=$1
python3 torch_client.py --cf config/fedml_config.yaml --rank $RANK --role client
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

python3 torch_server.py --cf config/fedml_config.yaml --rank 0 --role server
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import fedml
from fedml import FedMLRunner

if __name__ == "__main__":
args = fedml.init()

# init device
device = fedml.device.get_device(args)

# load data
dataset, output_dim = fedml.data.load(args)

# load model
model = fedml.model.create(args, output_dim)

# start training
fedml_runner = FedMLRunner(args, device, dataset, model)
fedml_runner.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import fedml
from fedml import FedMLRunner

if __name__ == "__main__":
args = fedml.init()

# init device
device = fedml.device.get_device(args)

# load data
dataset, output_dim = fedml.data.load(args)

# load model
model = fedml.model.create(args, output_dim)

# start training
fedml_runner = FedMLRunner(args, device, dataset, model)
fedml_runner.run()
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ The default ip of every groc server is set to `0.0.0.0`, and all grpc ports star
> the aggregator (rank: 0). This record is mandatory. However, you can change the values of the `ip` and `port`
> attributes as you see fit, and more records for grpc server of the rest of clients. For instance:
```
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
1,1,0.0.0.0,8899
2,2,0.0.0.0,8898
1,1,0.0.0.0,8891
2,2,0.0.0.0,8892
```

## Start Script
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ The default ip of every groc server is set to `0.0.0.0`, and all grpc ports star
> the aggregator (rank: 0). This record is mandatory. However, you can change the values of the `ip` and `port`
> attributes as you see fit, and more records for grpc server of the rest of clients. For instance:
```
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
1,1,0.0.0.0,8899
2,2,0.0.0.0,8898
1,1,0.0.0.0,8891
2,2,0.0.0.0,8892
```

## Start Script
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
eid,rank,ip,port
eid,rank,grpc_server_ip,grpc_server_port
0,0,0.0.0.0,8890
9 changes: 9 additions & 0 deletions python/fedml/computing/scheduler/comm_utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,15 @@ def get_run_container_name(run_id: int) -> str:
container_name = f"{container_prefix}__{run_id}"
return container_name

@staticmethod
def docker_client_exists() -> bool:
try:
client = docker.from_env()
client.ping()
return True
except docker.errors.DockerException:
return False

@staticmethod
def get_docker_client(docker_args: DockerArgs) -> DockerClient:
try:
Expand Down
4 changes: 2 additions & 2 deletions python/fedml/computing/scheduler/comm_utils/sys_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import sys
import subprocess

from fedml.computing.scheduler.slave.client_constants import ClientConstants

FETAL_ERROR_START_CODE = 128

SYS_ERR_CODE_MAP = {"0": "Successful exit without errors.",
Expand Down Expand Up @@ -817,6 +815,8 @@ def daemon_ota_upgrade_with_version(in_version="release"):


def run_cmd(command, show_local_console=False):
# Had to import ClientConstans here because otherwise it was raising circular import errors.
from fedml.computing.scheduler.slave.client_constants import ClientConstants
process = ClientConstants.exec_console_with_script(command, should_capture_stdout=True,
should_capture_stderr=True)
ret_code, out, err = ClientConstants.get_console_pipe_out_err_results(process)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,15 +616,17 @@ def cleanup_containers_and_release_gpus(run_id, edge_id, job_type=SchedulerConst
if not (job_type == SchedulerConstants.JOB_TASK_TYPE_SERVE or
job_type == SchedulerConstants.JOB_TASK_TYPE_DEPLOY):

# Terminate the run docker container if exists
try:
container_name = JobRunnerUtils.get_run_container_name(run_id)
docker_client = JobRunnerUtils.get_docker_client(DockerArgs())
logging.info(f"Terminating the run docker container {container_name} if exists...")
JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client)
except Exception as e:
logging.error(f"Exception {e} occurred when terminating docker container. "
f"Traceback: {traceback.format_exc()}")
# Check if docker client exists and then terminate containers.
if JobRunnerUtils.docker_client_exists():
try:
# Terminate docker container.
docker_client = JobRunnerUtils.get_docker_client(DockerArgs())
container_name = JobRunnerUtils.get_run_container_name(run_id)
logging.info(f"Terminating the run docker container {container_name} if exists...")
JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client)
except Exception as e:
logging.error(f"Exception {e} occurred when terminating docker container. "
f"Traceback: {traceback.format_exc()}")

# Release the GPU ids and update the GPU availability in the persistent store
JobRunnerUtils.get_instance().release_gpu_ids(run_id, edge_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,16 +509,17 @@ def process_status(self, run_id, status, edge_id, master_id=None):
if run_process is not None:
if run_process.pid is not None:
RunProcessUtils.kill_process(run_process.pid)

# Terminate the run docker container if exists
try:
container_name = JobRunnerUtils.get_run_container_name(run_id)
docker_client = JobRunnerUtils.get_docker_client(DockerArgs())
logging.info(f"Terminating the run docker container {container_name} if exists...")
JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client)
except Exception as e:
logging.error(f"Error occurred when terminating docker container."
f"Exception: {e}, Traceback: {traceback.format_exc()}.")
# Check if docker client exists and then terminate containers.
if JobRunnerUtils.docker_client_exists():
try:
# Terminate docker container.
docker_client = JobRunnerUtils.get_docker_client(DockerArgs())
container_name = JobRunnerUtils.get_run_container_name(run_id)
logging.info(f"Terminating the run docker container {container_name} if exists...")
JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client)
except Exception as e:
logging.error(f"Error occurred when terminating docker container."
f"Exception: {e}, Traceback: {traceback.format_exc()}.")

# Stop log processor for current run
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, edge_id)
Expand Down
Loading
Loading