diff --git a/.github/workflows/build-package.yml b/.github/workflows/build-package.yml index 79784406..7f846045 100644 --- a/.github/workflows/build-package.yml +++ b/.github/workflows/build-package.yml @@ -51,6 +51,8 @@ jobs: run: | source venv/bin/activate emd + emd version + emd list-supported-models - name: Upload wheel artifact uses: actions/upload-artifact@v4 diff --git a/src/emd/cfn/sagemaker_realtime/template.yaml b/src/emd/cfn/sagemaker_realtime/template.yaml index 3facccbf..03c42319 100644 --- a/src/emd/cfn/sagemaker_realtime/template.yaml +++ b/src/emd/cfn/sagemaker_realtime/template.yaml @@ -34,6 +34,13 @@ Parameters: Type: Number Description: The target value for the autoscaling Default: 10 + SageMakerEndpointName: + Type: String + Description: The name of the SageMaker Endpoint + Default: "noname" + +Conditions: + UseDefaultEndpointName: !Equals [!Ref SageMakerEndpointName, "noname"] Resources: ExecutionRole: @@ -90,7 +97,10 @@ Resources: SageMakerEndpoint: Type: AWS::SageMaker::Endpoint Properties: - EndpointName: !Sub '${AWS::StackName}-endpoint' + EndpointName: !If + - UseDefaultEndpointName + - !Sub '${AWS::StackName}-endpoint' + - !Ref SageMakerEndpointName EndpointConfigName: !GetAtt SageMakerEndpointConfig.EndpointConfigName diff --git a/src/emd/commands/deploy.py b/src/emd/commands/deploy.py index 35cf1fc9..fc92453a 100644 --- a/src/emd/commands/deploy.py +++ b/src/emd/commands/deploy.py @@ -136,7 +136,8 @@ def ask_model_id(region,model_id=None): # step 1: select model series name support_models:list[Model] = sorted( - [Model.get_model(m) for m in Model.get_supported_models()], + [Model.get_model(m) for m in Model.get_supported_models() + if hasattr(Model.get_model(m), 'model_series') and hasattr(Model.get_model(m).model_series, 'model_series_name')], key=lambda x:x.model_series.model_series_name ) # filter models @@ -234,13 +235,34 @@ def deploy( ] = False, only_allow_local_deploy: Annotated[ Optional[bool], typer.Option("--only-allow-local-deploy", help="only allow local instance") - ] = False + ] = False, + dockerfile_local_path: Annotated[ + str, typer.Option("--dockerfile-local-path", help="Your custom Dockerfile path for building the model image, all files must be in the same directory") + ] = None, ): if only_allow_local_deploy: allow_local_deploy = True region = LOCAL_REGION else: region = get_current_region() + + if dockerfile_local_path: + response = sdk_deploy( + model_id='custom-docker', + model_tag=f"{model_id}-{model_tag}", + instance_type=instance_type, + engine_type='custom', + framework_type='custom', + service_type='sagemaker_realtime', + region=region, + extra_params = extra_params, + env_stack_on_failure = "ROLLBACK", + force_env_stack_update = force_update_env_stack, + waiting_until_deploy_complete=True, + dockerfile_local_path=dockerfile_local_path, + ) + return response + vpc_id = None # ask model id model_id = ask_model_id(region,model_id=model_id) diff --git a/src/emd/models/__init__.py b/src/emd/models/__init__.py index d99f30c8..b2b50dd7 100644 --- a/src/emd/models/__init__.py +++ b/src/emd/models/__init__.py @@ -19,6 +19,7 @@ asr, embeddings, reranks, + custom, ) # text-2-image,text-2-video diff --git a/src/emd/models/custom/__init__.py b/src/emd/models/custom/__init__.py new file mode 100644 index 00000000..c99dcf36 --- /dev/null +++ b/src/emd/models/custom/__init__.py @@ -0,0 +1 @@ +from .custom_docker import * diff --git a/src/emd/models/custom/custom_docker.py b/src/emd/models/custom/custom_docker.py new file mode 100644 index 00000000..ac70dfca --- /dev/null +++ b/src/emd/models/custom/custom_docker.py @@ -0,0 +1,39 @@ +from .. import Model + +from ..services import ( + sagemaker_service, +) +from ..frameworks import custom_framework +from ..instances import ( + g5dxlarge_instance, + g5d2xlarge_instance, + g5d4xlarge_instance, + g5d8xlarge_instance, + g5d12xlarge_instance, + local_instance +) +from ..engines import custom_engine + +Model.register( + dict( + model_id = "custom-docker", + supported_engines=[custom_engine], + supported_instances=[ + g5dxlarge_instance, + g5d2xlarge_instance, + g5d4xlarge_instance, + g5d8xlarge_instance, + g5d12xlarge_instance, + local_instance + ], + supported_services=[ + sagemaker_service, + ], + supported_frameworks=[ + custom_framework + ], + allow_china_region=True, + description="Custom model running in Docker container", + need_prepare_model=False, + ) +) diff --git a/src/emd/models/engines.py b/src/emd/models/engines.py index 55cd2180..92c33513 100644 --- a/src/emd/models/engines.py +++ b/src/emd/models/engines.py @@ -393,3 +393,7 @@ class KtransformersEngine(OpenAICompitableEngine): "use_public_ecr":False, "default_cli_args": " --max_new_tokens 2048", }) + +custom_engine = Engine(**{ + "engine_type":EngineType.CUSTOM, +}) diff --git a/src/emd/models/model.py b/src/emd/models/model.py index 1cd08cf8..1e052ef1 100644 --- a/src/emd/models/model.py +++ b/src/emd/models/model.py @@ -74,7 +74,7 @@ def check_inf2_instance(cls,instance_type:str): class Engine(ModelBase): engine_type: EngineType engine_dockerfile_config: Union[dict,None] = Field(default_factory=dict) - engine_cls: str + engine_cls: Union[str,None] = None dockerfile_name: str = "Dockerfile" base_image_account_id: Union[str,None] = None base_image_host: Union[str,None] = None @@ -181,7 +181,7 @@ class Model(ModelBase,Generic[T]): require_huggingface_token: bool = False modelscope_model_id: str = "" require_modelscope_token: bool = False - application_scenario: str + application_scenario: str = "" description: str = "" model_type: ModelType = ModelType.LLM need_prepare_model: bool = True @@ -189,7 +189,7 @@ class Model(ModelBase,Generic[T]): model_files_s3_path: Union[str,None] = None model_files_local_path: Union[str,None] = None model_files_download_source: ModelFilesDownloadSource = ModelFilesDownloadSource.AUTO - model_series: ModelSeries + model_series: ModelSeries = None executable_config: Union[ExecutableConfig,None] = None @classmethod diff --git a/src/emd/models/services.py b/src/emd/models/services.py index 037f9f26..05737773 100644 --- a/src/emd/models/services.py +++ b/src/emd/models/services.py @@ -16,7 +16,8 @@ "EngineType":"engine_type", "Region":"region", "MaxCapacity": ValueWithDefault(name="max_capacity",default=1), - "AutoScalingTargetValue": ValueWithDefault(name="auto_scaling_target_value",default=10) + "AutoScalingTargetValue": ValueWithDefault(name="auto_scaling_target_value",default=10), + "SageMakerEndpointName": ValueWithDefault(name="sagemaker_endpoint_name",default="noname") }, name = "Amazon SageMaker AI Real-time inference", service_type=ServiceType.SAGEMAKER, diff --git a/src/emd/models/utils/constants.py b/src/emd/models/utils/constants.py index 66ec6abc..a44d5e56 100644 --- a/src/emd/models/utils/constants.py +++ b/src/emd/models/utils/constants.py @@ -32,6 +32,7 @@ class EngineType(ConstantBase): LLAMA_CPP = "llama.cpp" TGI = "tgi" LMDEPLOY = 'lmdeploy' + CUSTOM = "custom" KTRANFORMERS = 'ktransformers' # @classmethod diff --git a/src/emd/sdk/deploy.py b/src/emd/sdk/deploy.py index bea4341f..6426215d 100644 --- a/src/emd/sdk/deploy.py +++ b/src/emd/sdk/deploy.py @@ -1,15 +1,18 @@ import json import os +import io import time from typing import Optional import boto3 +import zipfile import sys from emd.constants import ( CODEPIPELINE_NAME, ENV_STACK_NAME, MODEL_DEFAULT_TAG, + MODEL_STACK_NAME_PREFIX, VERSION, LOCAL_REGION ) @@ -57,11 +60,15 @@ def prepare_deploy( service_type=None, instance_type=None, region=None, + dockerfile_local_path=None ): - model: Model = Model.get_model(model_id) - model_stack_name = model.get_model_stack_name_prefix( - model_id, model_tag=model_tag - ) + if dockerfile_local_path: + model_stack_name = f"{MODEL_STACK_NAME_PREFIX}-{model_id}-{model_tag}" + else: + model: Model = Model.get_model(model_id) + model_stack_name = model.get_model_stack_name_prefix( + model_id, model_tag=model_tag + ) # check if model_id is inprogress in pipeline execution if check_stack_exists(model_stack_name): raise RuntimeError( @@ -113,6 +120,7 @@ def deploy( env_stack_on_failure="ROLLBACK", force_env_stack_update=False, waiting_until_deploy_complete=True, + dockerfile_local_path=None, ) -> dict: # Check if AWS environment is properly configured if service_type == ServiceType.SAGEMAKER_OLDER: @@ -132,9 +140,13 @@ def deploy( service_type=service_type, instance_type=instance_type, region=region, + dockerfile_local_path=dockerfile_local_path ) # logger.info("Checking AWS environment...") - extra_params = extra_params or {} + if isinstance(extra_params, str): + extra_params = json.loads(extra_params) + else: + extra_params = extra_params or {} if model_stack_name is None: # stack_name_suffix = random_suffix() model_stack_name = ( @@ -168,23 +180,38 @@ def deploy( pipeline_name = pipeline_resources[0]["PhysicalResourceId"] logger.info("AWS environment is properly configured.") - model = Model.get_model(model_id) - - # check instance,service,engine - supported_instances = model.supported_instance_types - assert ( - instance_type in supported_instances - ), f"Instance type {instance_type} is not supported for model {model_id}" - - supported_engines = model.supported_engine_types - assert ( - engine_type in supported_engines - ), f"Engine type {engine_type} is not supported for model {model_id}" - - supported_services = model.supported_service_types - assert ( - service_type in supported_services - ), f"Service type {service_type} is not supported for model {model_id}" + if dockerfile_local_path: + if not os.path.exists(dockerfile_local_path): + raise FileNotFoundError(f"Dockerfile path {dockerfile_local_path} does not exist.") + + # Create a zip file of the dockerfile directory + zip_buffer = zipped_dockerfile(dockerfile_local_path) + + # Upload the zip file to S3 + s3 = boto3.client('s3', region_name=region) + s3_key = f"emd_models/{model_id}-{model_tag}.zip" + s3.upload_fileobj(zip_buffer, bucket_name, s3_key) + extra_params["model_params"] = extra_params.get("model_params", {}) + extra_params["model_params"]["custom_dockerfile_path"] = f"s3://{bucket_name}/{s3_key}" + logger.info(f"extra_params: {extra_params}") + else: + model = Model.get_model(model_id) + + # check instance,service,engine + supported_instances = model.supported_instance_types + assert ( + instance_type in supported_instances + ), f"Instance type {instance_type} is not supported for model {model_id}" + + supported_engines = model.supported_engine_types + assert ( + engine_type in supported_engines + ), f"Engine type {engine_type} is not supported for model {model_id}" + + supported_services = model.supported_service_types + assert ( + service_type in supported_services + ), f"Service type {service_type} is not supported for model {model_id}" # Start pipeline execution codepipeline = boto3.client("codepipeline", region_name=region) @@ -329,3 +356,15 @@ def deploy_local( assert ( os.system(pipeline_cmd) == 0 ), f"run pipeline cmd failed: {pipeline_cmd}" + +def zipped_dockerfile(dockerfile_local_path): + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED) as zipf: + dockerfile_dir = os.path.dirname(dockerfile_local_path) + for root, dirs, files in os.walk(dockerfile_dir): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, dockerfile_dir) + zipf.write(file_path, arcname) + zip_buffer.seek(0) + return zip_buffer diff --git a/src/pipeline/deploy/build_and_push_image.py b/src/pipeline/deploy/build_and_push_image.py index 8854c816..60aafe52 100644 --- a/src/pipeline/deploy/build_and_push_image.py +++ b/src/pipeline/deploy/build_and_push_image.py @@ -330,3 +330,178 @@ def run( parameters = {"ecr_repo_uri": ecr_repo_uri} return parameters + +def run_custom( + region, + model_id, + model_tag, + backend_type, + service_type, + framework_type, + image_name, + image_tag, + model_s3_bucket, + instance_type, + extra_params, + dockerfile_local_path +): + model = Model.get_model(model_id) + + execute_model = model.convert_to_execute_model( + region=region, + instance_type=instance_type, + engine_type=backend_type, + service_type=service_type, + framework_type=framework_type, + model_s3_bucket=model_s3_bucket, + extra_params=extra_params, + model_tag=model_tag + ) + + # engine = execute_model.get_engine() + logger.info(f"Building and deploying {model_id} on {backend_type} backend") + execute_dir = os.path.dirname(dockerfile_local_path) + logger.info(f"docker build dir: {execute_dir}") + + # Build and push image + logger.info(f"Building and pushing {image_name} image") + + # docker build image + # get current aws account_id + push_image_account_id = execute_model.get_image_push_account_id() + build_image_account_id = ( + execute_model.executable_config.current_engine.base_image_account_id + ) + build_image_host = execute_model.executable_config.current_engine.base_image_host + + # get ecr repo uri + ecr_repo_uri = execute_model.get_image_uri( + account_id=push_image_account_id, + region=region, + image_name=image_name, + image_tag=image_tag, + ) + + print("build_image_account_id", build_image_account_id, push_image_account_id) + + if not build_image_host and build_image_account_id: + build_image_host = execute_model.get_image_host( + execute_model.get_image_uri( + account_id=build_image_account_id, + region=region, + image_name=image_name, + image_tag=image_tag, + ) + ) + + push_image_host = execute_model.get_image_host(ecr_repo_uri) + + # build image + use_public_ecr = execute_model.executable_config.current_engine.use_public_ecr + if use_public_ecr: + ecr_name = "ecr-public" + else: + ecr_name = "ecr" + + docker_login_region = ( + execute_model.executable_config.current_engine.docker_login_region + ) + + docker_login_region = docker_login_region or region + dockerfile_name = dockerfile_local_path.split("/")[-1] + + if build_image_host: + build_image_script_cn = ( + f"cd {execute_dir}" + f' && docker build --platform linux/amd64 -f {dockerfile_name} -t "{ecr_repo_uri}" .' + ) + build_image_script_global = ( + f"cd {execute_dir}" + f" && aws {ecr_name} get-login-password --region {docker_login_region} | docker login --username AWS --password-stdin {build_image_host}" + f' && docker build --platform linux/amd64 -f {dockerfile_name} -t "{ecr_repo_uri}" .' + ) + if check_cn_region(region): + build_image_scripts = [build_image_script_cn] + else: + build_image_scripts = [build_image_script_global,build_image_script_cn] + + is_build_success = False + for build_image_script in build_image_scripts: + logger.info(f"building image: {build_image_script}") + try: + assert os.system(build_image_script) == 0 + is_build_success = True + break + except Exception as e: + logger.error(f"docker build errorr: {e}") + + if not is_build_success: + raise RuntimeError("docker build errorr") + + + # build_image_script = ( + # f"cd {execute_dir}" + # f" && aws {ecr_name} get-login-password --region {docker_login_region} | docker login --username AWS --password-stdin {build_image_host}" + # f' && docker build --platform linux/amd64 -f {dockerfile_name} -t "{ecr_repo_uri}" .' + # ) + else: + build_image_script = ( + f"cd {execute_dir}" + f' && docker build --platform linux/amd64 -f {dockerfile_name} -t "{ecr_repo_uri}" .' + ) + + logger.info(f"building image: {build_image_script}") + assert os.system(build_image_script) == 0 + + # push image + # It should not push the image to ecr when service_type is `local` + if service_type != ServiceType.LOCAL: + ecr_client = boto3.client("ecr", region_name=region) + try: + response = ecr_client.create_repository( + repositoryName=image_name, + ) + logger.info(f"ecr repo: {image_name} created.") + except ecr_client.exceptions.RepositoryAlreadyExistsException: + logger.info(f"ecr repo: {image_name} exist.") + + ## give erc repo policy + ecr_repository_policy = { + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "new statement", + "Effect": "Allow", + "Principal": "*", + "Action": [ + "ecr: CompleteLayerUpload", + "ecr: InitiateLayerUpload", + "ecr: ListImages", + "ecr:BatchCheckLayerAvailability", + "ecr:BatchGetImage", + "ecr:DescribeImages", + "ecr:DescribeRepositories", + "ecr:GetDownloadUrlForLayer", + ], + } + ], + } + response = ecr_client.set_repository_policy( + repositoryName=image_name, policyText=json.dumps(ecr_repository_policy) + ) + + ## push image + push_image_script = ( + f"cd {execute_dir}" + f" && aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {push_image_host}" + f' && docker push "{ecr_repo_uri}"' + ) + + logger.info(f"pushing image: {push_image_script}") + assert os.system(push_image_script) == 0 + + image_uri = ecr_repo_uri + logger.info(f"Image URI: {ecr_repo_uri}") + + parameters = {"ecr_repo_uri": ecr_repo_uri} + return parameters diff --git a/src/pipeline/parameters.json b/src/pipeline/parameters.json index e8dc6d2f..04cdf6ed 100644 --- a/src/pipeline/parameters.json +++ b/src/pipeline/parameters.json @@ -1 +1 @@ -{"Parameters": {"InstanceType": "g5.12xlarge", "ModelId": "DeepSeek-R1-Distill-Qwen-32B-GGUF", "ECRImageURI": "683638520402.dkr.ecr.us-west-2.amazonaws.com/deepseek-r1-distill-qwen-32b-gguf:latest", "ModelTag": "latest", "FrameWorkType": "fastapi", "ServiceType": "local", "EngineType": "llama.cpp", "Region": "us-west-2"}} +{"Parameters": {"ECRImageURI": "683638520402.dkr.ecr.us-west-2.amazonaws.com/model-in-docker:latest", "InstanceType": "ml.g5.4xlarge", "ModelId": "Model-In-Docker", "ModelTag": "latest", "FrameWorkType": "custom", "ServiceType": "sagemaker_realtime", "EngineType": "custom", "Region": "us-west-2", "MaxCapacity": "1", "AutoScalingTargetValue": "10"}} diff --git a/src/pipeline/pipeline.py b/src/pipeline/pipeline.py index 1ed46ba7..980028db 100644 --- a/src/pipeline/pipeline.py +++ b/src/pipeline/pipeline.py @@ -7,11 +7,12 @@ import logging from concurrent.futures import as_completed,ProcessPoolExecutor +from utils.common import download_file_from_s3_by_s5cmd + from emd.models import Model from emd.constants import MODEL_DEFAULT_TAG,LOCAL_REGION from emd.models.utils.constants import FrameworkType,ServiceType,InstanceType -from utils.common import str2bool from emd.utils.aws_service_utils import check_cn_region from emd.models import Model, ExecutableConfig from emd.models.utils.serialize_utils import load_extra_params,dump_extra_params @@ -46,6 +47,7 @@ def parse_args(): parser.add_argument("--skip_image_build", action='store_true') parser.add_argument("--skip_deploy", action='store_true') parser.add_argument("--disable_parallel_prepare_and_build_image", action='store_true') + parser.add_argument("--dockerfile_local_path", type=str, default=None) parser.add_argument( "--extra_params", type=load_extra_params, @@ -82,19 +84,50 @@ def run_build_image(args): if not args.image_uri and not args.skip_image_build: logger.info("build image...") from deploy import build_and_push_image - build_and_push_output_params = build_and_push_image.run( - region=args.region, - model_id=args.model_id, - model_tag=args.model_tag, - backend_type=args.backend_type, - service_type=args.service_type, - framework_type=args.framework_type, - image_name=args.image_name, - image_tag="latest" if args.model_tag == MODEL_DEFAULT_TAG else args.model_tag, - model_s3_bucket=args.model_s3_bucket, - instance_type=args.instance_type, - extra_params=args.extra_params - ) + if args.extra_params.get("model_params", {}).get("custom_dockerfile_path", None): + dockerfile_s3_path = args.extra_params["model_params"]["custom_dockerfile_path"] + image_build_dir = os.path.join(os.getcwd(), "docker_build") + os.makedirs(image_build_dir, exist_ok=True) + local_zip_path = os.path.join(image_build_dir, "docker_build.zip") + # download dockerfile from s3 + download_file_from_s3_by_s5cmd( + dockerfile_s3_path, + local_zip_path + ) + os.system(f"unzip -o {local_zip_path} -d {image_build_dir}") + # build image with custom dockerfile + logger.info(f"custom_dockerfile_path: {dockerfile_s3_path}") + logger.info(f"image_build_dir: {image_build_dir}") + dockerfile_local_path = os.path.join(image_build_dir, "Dockerfile") + logger.info(f"dockerfile_local_path: {dockerfile_local_path}") + build_and_push_output_params = build_and_push_image.run_custom( + region=args.region, + model_id=args.model_id, + model_tag=args.model_tag, + backend_type=args.backend_type, + service_type=args.service_type, + framework_type=args.framework_type, + image_name=args.image_name, + image_tag="latest" if args.model_tag == MODEL_DEFAULT_TAG else args.model_tag, + model_s3_bucket=args.model_s3_bucket, + instance_type=args.instance_type, + extra_params=args.extra_params, + dockerfile_local_path=dockerfile_local_path + ) + else: + build_and_push_output_params = build_and_push_image.run( + region=args.region, + model_id=args.model_id, + model_tag=args.model_tag, + backend_type=args.backend_type, + service_type=args.service_type, + framework_type=args.framework_type, + image_name=args.image_name, + image_tag="latest" if args.model_tag == MODEL_DEFAULT_TAG else args.model_tag, + model_s3_bucket=args.model_s3_bucket, + instance_type=args.instance_type, + extra_params=args.extra_params + ) else: print("skip build image...") print(f"image build elapsed time: ", time.time() - t2) diff --git a/src/pipeline/utils/common.py b/src/pipeline/utils/common.py index 01d2c65d..3093f369 100644 --- a/src/pipeline/utils/common.py +++ b/src/pipeline/utils/common.py @@ -41,6 +41,17 @@ def download_dir_from_s3_by_s5cmd(local_dir,bucket_name=None, s3_key=None,model_ logger.info(f"Downloading model files from {model_files_s3_path}") assert os.system(f"./s5cmd cp {model_files_s3_path}/* {local_dir}") == 0 +def download_file_from_s3_by_s5cmd(s3_file_path, local_file_path): + """ + Download a file from S3 using s5cmd. + + Args: + s3_file_path (str): The S3 file path (e.g., s3://bucket/key). + local_file_path (str): The local file path to save the downloaded file. + """ + logger.info(f"Downloading {s3_file_path} to {local_file_path}") + os.system(f"./s5cmd cp {s3_file_path} {local_file_path}") + def upload_dir_to_s3(bucket_name, local_dir_name): logger.info(f"Uploading {local_dir_name} to {bucket_name} bucket") s3 = boto3.client('s3')