Run Verl on Sagemaker Using 4x8 L40s GPUs
Background: Volcano Engine Reinforcement Learning for LLMs (verl) which is a flexible, efficient and production-ready RL training library for large language models (LLMs). Amazon SageMaker AI provides machine learning (ML) capabilities that are purpose-built for data scientists and developers to prepare, build, train, and deploy high-quality ML models efficiently.
5 min read
·
Jun 23, 2025https://github.com/yang0110/verl-sagemaker-project.git
Run GRPO/PPO for finetuning LLM on Sagemaker
- Write ray helper functions
- Write GRPO/PPO train scripts
- Build Docker container
- Write notebook script o launch sagemaker training job
File structure
Dockerfile
FROM 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6.0-gpu-py312-cu126-ubuntu22.04-sagemaker
COPY src /opt/ml/code
WORKDIR /opt/ml/code
RUN python -m pip install --upgrade pip
RUN pip install -e ./verl
RUN pip install vllm==0.8.4
RUN pip install sglang==0.4.6.post5
RUN pip install wandb
RUN pip install boto3
RUN pip install --upgrade huggingface_hub
RUN pip install hf_xet
# The name of our algorithm
docker_name=verl-image
account=$(aws sts get-caller-identity --query Account --output text)
# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=us-east-1
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${docker_name}:latest"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${docker_name}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${docker_name}" > /dev/null
fi
$(aws ecr get-login --registry-ids ${account} --region ${region} --no-include-email)
$(aws ecr get-login --registry-ids 763104351884 --region ${region} --no-include-email)
# Get the login command from ECR and execute it directly
docker login --username AWS --password $(aws ecr get-login-password --region ${region} ) ${account}.dkr.ecr.${region}.amazonaws.com
# Build the docker image locally with the image name and then push it to ECR
# with the full name.
# docker build -t ${docker_name} --no-cache --progress plain . --build-arg REGION=${region} --file Dockerfile
docker build -t ${docker_name} --progress plain . --build-arg REGION=${region} --file Dockerfile
docker tag ${docker_name} ${fullname}
docker push ${fullname}
enterypoint.py
import os
import json
import socket
from ray_helper_fn import RayHelper
from get_node_ip import get_master_host_flag
import time
import subprocess
if __name__ == "__main__":
current_host, current_ip, is_master_host_flag, master_host, master_ip = get_master_host_flag()
print('--- Launch the Ray Cluster')
ray_port = "6379"
ray_helper = RayHelper(ray_port=ray_port)
ray_helper.start_ray()
if is_master_host_flag == True:
os.system('ray status')
train_script = os.environ['TRAIN_SCRIPT']
print('--- train script', train_script)
os.system(f"chmod +x {train_script}")
os.system(f'/bin/bash -c {train_script}')
else:
print('--- This is a worker node, so do nothing')
pass
get_node_ip.py
import subprocess
import os
import time
import socket
import json
def get_resource_config():
return dict(current_host = os.environ.get("SM_CURRENT_HOST"),
hosts = json.loads(os.environ.get("SM_HOSTS")) )
def get_master_host():
master_host = None
config = os.environ.get("SM_RESOURCE_CONFIG")
config = json.loads(config)
instance_groups = config['instance_groups']
group_num = len(instance_groups)
for group in range(group_num):
group_name = instance_groups[group]['instance_group_name']
if group_name == 'gpu_group':
master_host = instance_groups[group]['hosts'][0]
return master_host
def get_ip_from_host(host):
ip_wait_time = 200
counter = 0
ip = ""
while counter < ip_wait_time and ip == "":
try:
ip = socket.gethostbyname(host)
break
except:
counter += 1
time.sleep(1)
if counter == ip_wait_time and ip == "":
raise Exception(
"Exceeded max wait time of {}s for hostname resolution".format(ip_wait_time)
)
return ip
def get_master_host_flag():
master_host = get_master_host()
resource_config = get_resource_config()
current_host = resource_config["current_host"]
master_ip = get_ip_from_host(master_host)
current_ip = get_ip_from_host(current_host)
is_master_host_flag = False
if current_host == master_host:
is_master_host_flag = True
return current_host, current_ip, is_master_host_flag, master_host, master_ip
ray_helper_fn.py
import subprocess
import os
import time
import ray
import socket
import json
import sys
class RayHelper():
def __init__(self, ray_port:str="6379", redis_pass:str="redis_password"):
self.ray_port = ray_port
self.redis_pass = redis_pass
self.resource_config = self.get_resource_config()
self.master_host = self.get_gpu_host()
self.n_hosts = len(self.resource_config["hosts"])
@staticmethod
def get_gpu_host():
master_host = None
config = os.environ.get("SM_RESOURCE_CONFIG")
config = json.loads(config)
instance_groups = config['instance_groups']
group_num = len(instance_groups)
for group in range(group_num):
group_name = instance_groups[group]['instance_group_name']
if group_name == 'gpu_group':
# take the first host as the master host
# print('--- instance_groups[group][hosts]', instance_groups[group]['hosts'])
master_host = instance_groups[group]['hosts'][0]
# print('--- master host', master_host)
return master_host
@staticmethod
def get_resource_config():
return dict(current_host = os.environ.get("SM_CURRENT_HOST"),
hosts = json.loads(os.environ.get("SM_HOSTS")) )
def _get_head_port(self):
return self.ray_port
def _get_master_ip_from_host(self):
ip_wait_time = 200
counter = 0
ip = ""
while counter < ip_wait_time and ip == "":
try:
ip = socket.gethostbyname(self.master_host)
break
except:
counter += 1
time.sleep(1)
if counter == ip_wait_time and ip == "":
raise Exception(
"Exceeded max wait time of {}s for hostname resolution".format(ip_wait_time)
)
return ip
def start_ray(self):
self.master_ip = self._get_master_ip_from_host()
print('--- self.master_ip', self.master_ip)
if self.resource_config["current_host"] == self.master_host:
if ray.is_initialized():
print("There is a Ray cluste already running. Shutting it down.")
ray.shutdown()
time.sleep(5)
print('--- ...start the head node')
output = subprocess.run(['ray', 'start', '--head', '--port', self.ray_port, '--redis-password', self.redis_pass, '--dashboard-host', '0.0.0.0', '--dashboard-port', '8265'], stdout=subprocess.PIPE)
time.sleep(120) # wait all worker nodes to join the cluster
else:
time.sleep(20) # wait for the master node be ready
print('--- ...add worker node')
output = subprocess.run(['ray', 'start', f'--address={self.master_ip}:{self.ray_port}', '--redis-password', self.redis_pass, '--block'], stdout=subprocess.PIPE)
sys.exit(0)
def _wait_for_workers(self, timeout=120):
print(f"Waiting {timeout} seconds for {self.n_hosts} nodes to join")
while len(ray.nodes()) < self.n_hosts:
print(f"{len(ray.nodes())} nodes connected to cluster")
time.sleep(5)
timeout-=5
if timeout==0:
raise Exception("Max timeout for nodes to join exceeded")
time.sleep(5)
print(f"{len(ray.nodes())} nodes connected to cluster")
qwen-3b-grpo-4-node.sh
set -x
# If you are using vllm<=0.6.3, you might need to set the following environment variable to avoid bugs:
# export VLLM_ATTENTION_BACKEND=XFORMERS
python3 -m verl.trainer.main_ppo \
algorithm.adv_estimator=grpo \
data.train_files=/opt/ml/code/verl/data/gsm8k/train.parquet \
data.val_files=/opt/ml/code/verl/data/gsm8k/test.parquet \
data.train_batch_size=1024 \
data.max_prompt_length=512 \
data.max_response_length=1024 \
data.filter_overlong_prompts=True \
data.truncation='error' \
actor_rollout_ref.model.path=/opt/ml/code/verl/models/Qwen/Qwen2.5-3B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.ppo_mini_batch_size=256 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=40 \
actor_rollout_ref.actor.use_kl_loss=True \
actor_rollout_ref.actor.kl_loss_coef=0.001 \
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
actor_rollout_ref.actor.entropy_coeff=0 \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
actor_rollout_ref.rollout.n=5 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.ref.fsdp_config.param_offload=True \
algorithm.use_kl_in_reward=False \
trainer.critic_warmup=0 \
trainer.logger=['console'] \
trainer.project_name='verl_grpo_example_gsm8k' \
trainer.experiment_name='qwen2_3b_function_rm' \
trainer.n_gpus_per_node=8 \
trainer.nnodes=4 \
trainer.save_freq=20 \
trainer.test_freq=5 \
trainer.total_epochs=15 $@
notebook.ipynb
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup
role = sagemaker.get_execution_role() # execution role for the training job
session = sagemaker.session.Session() # sagemaker session for interacting with different AWS APIs
default_bucket = session.default_bucket() # bucket to store model
print('role', role)
print('session', session)
print('default_bucket', default_bucket)
train_image_uri = 'aws-account.dkr.ecr.us-east-1.amazonaws.com/verl-image:latest'
print('train_image_uri', train_image_uri)
instance_type = 'ml.g6e.48xlarge'
instance_count = 4
gpu_group = InstanceGroup("gpu_group", instance_type, instance_count)
train_script = '/opt/ml/code/qwen-3b-grpo-{}-node.sh'.format(instance_count)
base_job_name = 'xxx'
print('train_script', train_script)
estimator = PyTorch(entry_point='entrypoint.py',
source_dir='src/',
base_job_name=base_job_name,
role=role,
image_uri=train_image_uri,
environment={'TRAIN_SCRIPT': train_script},
instance_groups=[gpu_group],
volume_size=500,
max_run=3*24*60*60,
)
estimator.fit()
Enjoy training…