Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ repos:
# "--ignore-regex=['forward', 'backward', 'reset_parameters', 'extra_repr', 'MetaData', 'apply_activation','exec_activation']",
# "--color", "--"]

- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.35.0
hooks:
- id: markdownlint
# - repo: https://github.com/igorshubovych/markdownlint-cli
# rev: v0.35.0
# hooks:
# - id: markdownlint

- repo: local
hooks:
Expand All @@ -42,3 +42,4 @@ repos:
entry: python test/ci_tests/header_check.py
language: python
pass_filenames: false
exclude-dir: [./modulus/sym/utils_aux]
2 changes: 1 addition & 1 deletion modulus/sym/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

__version__ = "1.4.0a0"

from pint import UnitRegistry

from .node import Node
from .key import Key
from .hydra.utils import main, compose
from .utils_aux import paddle_aux

# pint unit registry
ureg = UnitRegistry()
Expand Down
8 changes: 4 additions & 4 deletions modulus/sym/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
constant values used by Modulus
"""

import torch
import paddle
import numpy as np

# string used to determine derivatives
Expand All @@ -28,17 +28,17 @@ def diff(y: str, x: str, degree: int = 1) -> str:


# for changing to float16 or float64
tf_dt = torch.float32
tf_dt = paddle.get_default_dtype()
np_dt = np.float32

# tensorboard naming
TF_SUMMARY = False

# Pytorch Version for which JIT will be default on
JIT_PYTORCH_VERSION = "2.1.0a0+4136153"
# JIT_PYTORCH_VERSION = "2.1.0a0+4136153"
JIT_PADDLE_VERSION = None

# No scaling is needed if using NO_OP_SCALE
NO_OP_SCALE = (0.0, 1.0)

# If using NO_OP_NORM, it is effectively doing no normalization
NO_OP_NORM = (-1.0, 1.0)
16 changes: 8 additions & 8 deletions modulus/sym/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle

""" Dataset classes
"""

from typing import Dict

import numpy as np
import torch.utils.data

from modulus.sym.constants import tf_dt
from modulus.sym.distributed import DistributedManager
Expand Down Expand Up @@ -61,14 +61,14 @@ def _to_tensor_dict(var_dict, device=None):

# convert to torch
tensor_dict = {
key: torch.as_tensor(value, dtype=tf_dt, device=device)
key: paddle.to_tensor(value, dtype=tf_dt, place=device)
for key, value in var_dict.items()
}

return tensor_dict


class Dataset(_BaseDataset, torch.utils.data.Dataset):
class Dataset(_BaseDataset, paddle.io.Dataset):
"For defining map-style datasets, can be subclassed by user"

auto_collation = False
Expand All @@ -84,7 +84,7 @@ def __len__(self):
raise NotImplementedError("subclass must implement this")


class IterableDataset(_BaseDataset, torch.utils.data.IterableDataset):
class IterableDataset(_BaseDataset, paddle.io.IterableDataset):
"For defining iterable-style datasets, can be subclassed by user"

def __iter__(self):
Expand All @@ -107,10 +107,10 @@ def __init__(
if lambda_weighting is None:
lambda_weighting = {key: np.ones_like(x) for key, x in outvar.items()}

# convert dataset arrays to tensors
self.invar = Dataset._to_tensor_dict(invar)
self.outvar = Dataset._to_tensor_dict(outvar)
self.lambda_weighting = Dataset._to_tensor_dict(lambda_weighting)
# assign given data arrays to class attributes(no need to convert to tensors)
self.invar = invar
self.outvar = outvar
self.lambda_weighting = lambda_weighting

# get length
self.length = len(next(iter(self.invar.values())))
Expand Down
39 changes: 18 additions & 21 deletions modulus/sym/distributed/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn.functional as F
import torch.distributed as dist
import paddle
import paddle.nn.functional as F
import paddle.distributed as dist


def get_memory_format(tensor):
if tensor.is_contiguous(memory_format=torch.channels_last):
return torch.channels_last
else:
return torch.contiguous_format
raise NotImplementedError("get_memory_format is not implemented")


def pad_helper(tensor, dim, new_size, mode="zero"):
Expand All @@ -42,8 +39,8 @@ def pad_helper(tensor, dim, new_size, mode="zero"):
slice(0, x) if idx != dim else slice(1, output_shape[1] + 1)
for idx, x in enumerate(tensor.shape)
]
tensor_pad[lhs_slice] = torch.flip(
torch.conj(tensor_pad[rhs_slice]), dims=[dim]
tensor_pad[lhs_slice] = paddle.flip(
paddle.conj(tensor_pad[rhs_slice]), axis=[dim]
)

return tensor_pad
Expand All @@ -57,7 +54,7 @@ def truncate_helper(tensor, dim, new_size):
slice(0, x) if idx != dim else slice(0, new_size)
for idx, x in enumerate(tensor.shape)
]
tensor_trunc = tensor[output_slice].contiguous(memory_format=input_format)
tensor_trunc = tensor[output_slice].contiguous()

return tensor_trunc

Expand All @@ -71,7 +68,9 @@ def split_tensor_along_dim(tensor, dim, num_chunks):
), f"Error, cannot split dim {dim} evenly. Dim size is \
{tensor.shape[dim]} and requested numnber of splits is {num_chunks}"
chunk_size = tensor.shape[dim] // num_chunks
tensor_list = torch.split(tensor, chunk_size, dim=dim)
tensor_list = paddle.split(
tensor, num_or_sections=tensor.shape[dim] // chunk_size, axis=dim
)

return tensor_list

Expand All @@ -87,13 +86,13 @@ def _transpose(tensor, dim0, dim1, group=None, async_op=False):
# split and local transposition
split_size = tensor.shape[dim0] // comm_size
x_send = [
y.contiguous(memory_format=input_format)
for y in torch.split(tensor, split_size, dim=dim0)
y.contiguous()
for y in paddle.split(tensor, tensor.shape[dim0] // split_size, axis=dim0)
]
x_recv = [torch.empty_like(x_send[0]) for _ in range(comm_size)]
x_recv = [paddle.empty_like(x_send[0]) for _ in range(comm_size)]

# global transposition
req = dist.all_to_all(x_recv, x_send, group=group, async_op=async_op)
req = dist.alltoall(x_send, x_recv, group=group, sync_op=not async_op)

return x_recv, req

Expand All @@ -108,7 +107,7 @@ def _reduce(input_, use_fp32=True, group=None):
# All-reduce.
if use_fp32:
dtype = input_.dtype
inputf_ = input_.float()
inputf_ = input_.astype(dtype="float32")
dist.all_reduce(inputf_, group=group)
input_ = inputf_.to(dtype)
else:
Expand All @@ -130,9 +129,8 @@ def _split(input_, dim_, group=None):
# Split along last dimension.
input_list = split_tensor_along_dim(input_, dim_, comm_size)

# Note: torch.split does not create contiguous tensors by default.
rank = dist.get_rank(group=group)
output = input_list[rank].contiguous(memory_format=input_format)
output = input_list[rank].contiguous()

return output

Expand All @@ -155,11 +153,10 @@ def _gather(input_, dim_, group=None):
# Size and dimension.
comm_rank = dist.get_rank(group=group)

tensor_list = [torch.empty_like(input_) for _ in range(comm_size)]
tensor_list = [paddle.empty_like(input_) for _ in range(comm_size)]
tensor_list[comm_rank] = input_
dist.all_gather(tensor_list, input_, group=group)

# Note: torch.cat already creates a contiguous tensor.
output = torch.cat(tensor_list, dim=dim_).contiguous(memory_format=input_format)
output = paddle.concat(tensor_list, axis=dim_).contiguous()

return output
42 changes: 21 additions & 21 deletions modulus/sym/distributed/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.distributed as dist
import paddle
import paddle.distributed as dist

import logging
import os
Expand All @@ -22,6 +22,7 @@

logger = logging.getLogger("__name__")


# Create singleton DistributedManager class
class DistributedManager(object):
_shared_state = {}
Expand All @@ -40,18 +41,18 @@ def __new__(cls):
if not hasattr(obj, "_distributed"):
obj._distributed = False
if not hasattr(obj, "_device"):
obj._device = torch.device(
f"cuda:0" if torch.cuda.is_available() else "cpu"
obj._device: str = str(
f"gpu:0" if paddle.device.cuda.device_count() >= 1 else "cpu"
)
if not hasattr(obj, "_cuda"):
obj._cuda = torch.cuda.is_available()
obj._cuda = paddle.device.cuda.device_count() >= 1
if not hasattr(obj, "_broadcast_buffers"):
obj._broadcast_buffers = False
if not hasattr(obj, "_find_unused_parameters"):
obj._find_unused_parameters = False
if not hasattr(obj, "_cuda_graphs"):
obj._cuda_graphs = False

obj.place = paddle.device.set_device("gpu")
return obj

@property
Expand Down Expand Up @@ -163,7 +164,7 @@ def cuda_graphs(self, graphs: bool):

@staticmethod
def get_available_backend():
if torch.cuda.is_available() and torch.distributed.is_nccl_available():
if paddle.device.cuda.device_count() >= 1 and dist.get_backend() == "NCCL":
return "nccl"
else:
return "gloo"
Expand All @@ -175,7 +176,7 @@ def initialize_env():
if "LOCAL_RANK" in os.environ:
local_rank = int(os.environ.get("LOCAL_RANK"))
else:
local_rank = rank % torch.cuda.device_count()
local_rank = rank % paddle.device.cuda.device_count()
addr = os.environ.get("MASTER_ADDR")
port = os.environ.get("MASTER_PORT")

Expand Down Expand Up @@ -225,7 +226,6 @@ def initialize_slurm(port):
def initialize():
addr = os.getenv("MASTER_ADDR", "localhost")
port = os.getenv("MASTER_PORT", "12355")
# https://pytorch.org/docs/master/notes/cuda.html#id5
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
try:
DistributedManager.initialize_env()
Expand All @@ -241,7 +241,7 @@ def initialize():
manager = DistributedManager()
if manager.distributed:
print(
f'Initialized process {manager.rank} of {manager.world_size} using method "{manager._initialization_method}". Device set to {str(manager.device)}'
f'Initialized process {manager.rank} of {manager.world_size} using method "{manager._initialization_method}". Device set to {str(manager.place)}'
)

@staticmethod
Expand All @@ -259,38 +259,38 @@ def setup(

manager = DistributedManager()

manager._distributed = (world_size > 1) and torch.distributed.is_available()
manager._distributed = (world_size > 1) and dist.is_available()
if manager._distributed:
# Update rank and world_size if using distributed
manager._rank = rank
manager._world_size = world_size
if local_rank is None:
manager._local_rank = rank % torch.cuda.device_count()
manager._local_rank = rank % paddle.device.cuda.device_count()
else:
manager._local_rank = local_rank

# Setup distributed process group
# time.sleep(1)
dist.init_process_group(
backend, rank=manager.rank, world_size=manager.world_size
)
dist.init_parallel_env()

manager._groups = {}
manager._group_ranks = {}
manager._group_names = {}

manager._device = torch.device(
f"cuda:{manager.local_rank}" if torch.cuda.is_available() else "cpu"
manager._device = str(
f"gpu:{manager.local_rank}"
if paddle.device.cuda.device_count() >= 1
else "cpu"
)
# Needed for cuda graphs
if torch.cuda.is_available():
torch.cuda.set_device(manager.local_rank)
if paddle.device.cuda.device_count() >= 1:
paddle.device.set_device(device=f"gpu:{manager.local_rank}")

manager._initialization_method = method

# Set device for this process and empty cache to optimize memory usage
torch.cuda.device(manager.device)
torch.cuda.empty_cache()
paddle.device.set_device(manager.place)
paddle.device.cuda.empty_cache()

@staticmethod
def create_process_subgroup(name: str, size: int, group_name=None, verbose=False):
Expand Down
Loading