@@ -23,7 +23,7 @@ import json
from llama_recipes.model_checkpointing import save_model_checkpoint, save_model_and_optimizer_sharded, save_optimizer_checkpoint
from llama_recipes.policies import fpSixteen,bfSixteen, get_llama_wrapper
from llama_recipes.utils.memory_utils import MemoryTrace
+from accelerate.utils import is_xpu_available, is_ccl_available
def set_tokenizer_params(tokenizer: LlamaTokenizer):
tokenizer.pad_token_id = 0
@@ -89,9 +89,16 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
for step, batch in enumerate(train_dataloader):
for key in batch.keys():
if train_config.enable_fsdp:
- batch[key] = batch[key].to(local_rank)
+ if is_xpu_available():
+ batch[key] = batch[key].to(torch.device(f"xpu:{local_rank}"))
+ else:
+ batch[key] = batch[key].to(local_rank)
- batch[key] = batch[key].to('cuda:0')
+ if is_xpu_available():
+ batch[key] = batch[key].to('xpu:0')
+ else:
+ batch[key] = batch[key].to('cuda:0')
with autocast():
loss = model(**batch).loss
loss = loss / gradient_accumulation_steps
@@ -135,7 +142,9 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
epoch_end_time = time.perf_counter()-epoch_start_time
# Reducing total_loss across all devices if there's more than one CUDA device
- if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
+ if is_xpu_available() and (torch.xpu.device_count() > 1 and train_config.enable_fsdp):
+ dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
+ elif torch.cuda.device_count() > 1 and train_config.enable_fsdp:
dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
train_epoch_loss = total_loss / len(train_dataloader)
if train_config.enable_fsdp:
@@ -147,16 +156,28 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
if train_config.enable_fsdp:
if rank==0:
+ if is_xpu_available():
+ print(f"Max XPU memory allocated was {memtrace.peak} GB")
+ print(f"Max XPU memory reserved was {memtrace.max_reserved} GB")
+ print(f"Peak active XPU memory was {memtrace.peak_active_gb} GB")
+ print(f"Xpu Malloc retires : {memtrace.xpu_malloc_retires}")
+ else:
+ print(f"Max CUDA memory allocated was {memtrace.peak} GB")
+ print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
+ print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
+ print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
+ print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
+ else:
+ if is_xpu_available():
+ print(f"Max XPU memory allocated was {memtrace.peak} GB")
+ print(f"Max XPU memory reserved was {memtrace.max_reserved} GB")
+ print(f"Peak active XPU memory was {memtrace.peak_active_gb} GB")
+ print(f"Xpu Malloc retires : {memtrace.xpu_malloc_retires}")
+ else:
print(f"Max CUDA memory allocated was {memtrace.peak} GB")
print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
- print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
- else:
- print(f"Max CUDA memory allocated was {memtrace.peak} GB")
- print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
- print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
- print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
# Update the learning rate as needed
@@ -279,7 +300,10 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
if train_config.enable_fsdp:
batch[key] = batch[key].to(local_rank)
- batch[key] = batch[key].to('cuda:0')
+ if is_xpu_available():
+ batch[key] = batch[key].to('xpu:0')
+ else:
+ batch[key] = batch[key].to('cuda:0')
# Ensure no gradients are computed for this scope to save memory
with torch.no_grad():
# Forward pass and compute loss
@@ -297,6 +321,8 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
# If there's more than one CUDA device, reduce evaluation loss across all devices
+ if is_xpu_available() and (torch.xpu.device_count() > 1 and train_config.enable_fsdp):
+ dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
@@ -330,7 +356,11 @@ def check_frozen_layers_peft_model(model):
def setup():
"""Initialize the process group for distributed training"""
- dist.init_process_group("nccl")
+ if is_ccl_available():
+ # distributed training on xpus
+ dist.init_process_group("ccl")
+ else:
+ dist.init_process_group("nccl")
def setup_environ_flags(rank):
@@ -354,7 +384,10 @@ def clear_gpu_cache(rank=None):
"""Clear the GPU cache for all ranks"""
if rank == 0:
print(f"Clearing GPU cache for all ranks")
- torch.cuda.empty_cache()
+ if is_xpu_available():
+ torch.xpu_empty_cache()
+ else:
+ torch.cuda.empty_cache()
def get_parameter_dtypes(model):
@@ -386,13 +419,15 @@ def print_model_size(model, config, rank: int = 0) -> None:
def get_policies(cfg, rank):
"""Get the policies for mixed precision and fsdp wrapping"""
- verify_bfloat_support = (
+ verify_bfloat_support = ((
and torch.cuda.is_bf16_supported()
and packaging.version.parse(torch.version.cuda).release >= (11, 0)
and dist.is_nccl_available()
and nccl.version() >= (2, 10)
- )
+ ) or
+ (is_xpu_available()))
mixed_precision_policy = None