@@ -67,7 +67,8 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
scaler = ShardedGradScaler()
elif train_config.use_fp16 and not train_config.enable_fsdp:
scaler = torch.cuda.amp.GradScaler()
+ if train_config.enable_fsdp:
+ world_size = int(os.environ["WORLD_SIZE"])
train_prep = []
train_loss = []
val_prep = []
@@ -78,7 +79,6 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
with MemoryTrace() as memtrace: # track the memory usage
total_loss = 0.0
- data_set_len = 0
for step, batch in enumerate(tqdm(train_dataloader,colour="blue", desc=f"Training Epoch{epoch}")):
for key in batch.keys():
if train_config.enable_fsdp:
@@ -88,8 +88,6 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
loss = model(**batch).loss
loss = loss / gradient_accumulation_steps
total_loss += loss.detach().float()
- first_key = next(iter(batch))
- data_set_len += len(batch[first_key])
if train_config.use_fp16:
# if fp16 is enabled, use gradient scaler to handle gradient update
@@ -103,22 +101,35 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
if (step + 1) % gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1:
- print(f"\n step {step} is completed and loss is {loss.detach().float()}")
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"\n step {step} is completed and loss is {loss.detach().float()}")
+ else:
+ print(f"\n step {step} is completed and loss is {loss.detach().float()}")
# Reducing total_loss across all devices if there's more than one CUDA device
if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
- train_epoch_loss = total_loss / data_set_len
+ train_epoch_loss = total_loss / len(train_dataloader)
+ if train_config.enable_fsdp:
+ train_epoch_loss = train_epoch_loss/world_size
train_perplexity = torch.exp(train_epoch_loss)
- print(f"Max CUDA memory allocated was {memtrace.peak} GB")
- print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
- print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
- print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
- print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"Max CUDA memory allocated was {memtrace.peak} GB")
+ print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
+ print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
+ print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
+ print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
+ else:
+ print(f"Max CUDA memory allocated was {memtrace.peak} GB")
+ print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
+ print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
+ print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
+ print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
# Update the learning rate as needed
@@ -126,42 +137,62 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
if train_config.run_validation:
eval_ppl, eval_epoch_loss = evaluation(model, train_config, eval_dataloader, rank, tokenizer)
if train_config.save_model and eval_epoch_loss < best_val_loss:
- if train_config.use_peft:
- print(f"we are in the saving the PEFT modules")
- model.save_pretrained(train_config.output_dir)
- print(f"PEFT modules are saved in {train_config.output_dir} directory")
+ if train_config.enable_fsdp:
+ dist.barrier()
+ if train_config.use_peft:
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"we are about to save the PEFT modules")
+ else:
+ print(f"we are about to save the PEFT modules")
+ model.save_pretrained(train_config.output_dir)
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"PEFT modules are saved in {train_config.output_dir} directory")
+ else:
+ print(f"PEFT modules are saved in {train_config.output_dir} directory")
if not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.FULL_STATE_DICT:
- model, optimizer, rank, train_config, epoch=1
+ model, optimizer, rank, train_config, epoch=epoch
elif not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.SHARDED_STATE_DICT:
- print(" we are about to save the models *******")
+ print(" Saving the FSDP model checkpoints using SHARDED_STATE_DICT")
+ print("=====================================================")
model_checkpointing.save_model_and_optimizer_sharded(model, rank, train_config)
if train_config.save_optimizer:
model_checkpointing.save_model_and_optimizer_sharded(model, rank, train_config, optim=optimizer)
+ print(" Saving the FSDP model checkpoints qnd optimizer using SHARDED_STATE_DICT")
+ print("=====================================================")
if not train_config.use_peft and train_config.save_optimizer:
- model, optimizer, rank, train_config, epoch=1
- )
+ model, optimizer, rank, train_config, epoch=epoch
+ )
+ print(" Saving the FSDP model checkpoints qnd optimizer using FULL_STATE_DICT")
+ print("=====================================================")
+ if train_config.enable_fsdp:
+ dist.barrier()
- if local_rank == 0 and eval_epoch_loss < best_val_loss:
+ if eval_epoch_loss < best_val_loss:
best_val_loss = eval_epoch_loss
- print(f"best eval loss on epoch {epoch} is {best_val_loss}")
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"best eval loss on epoch {epoch} is {best_val_loss}")
+ else:
+ print(f"best eval loss on epoch {epoch} is {best_val_loss}")
- print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}")
- lr_scheduler.step()
+ if train_config.enable_fsdp:
+ if rank==0:
+ print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}")
+ else:
+ print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}")
avg_train_prep = sum(train_prep)/len(train_prep)
avg_train_loss = sum(train_loss)/len(train_loss)
if train_config.run_validation:
@@ -175,7 +206,7 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
results['avg_eval_loss'] = avg_eval_loss
#saving the training params including fsdp setting for reference.
- if train_config.enable_fsdp and fsdp_config:
+ if train_config.enable_fsdp and not train_config.use_peft:
save_train_params(train_config, fsdp_config, rank)
return results
@@ -192,10 +223,11 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
Returns: eval_ppl, eval_epoch_loss
+ if train_config.enable_fsdp:
+ world_size = int(os.environ["WORLD_SIZE"])
eval_preds = []
eval_loss = 0.0 # Initialize evaluation loss
- eval_dataset_len = 0
with MemoryTrace() as memtrace:
for step, batch in enumerate(tqdm(eval_dataloader,colour="green", desc="evaluating Epoch")):
for key in batch.keys():
@@ -209,9 +241,6 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
outputs = model(**batch)
loss = outputs.loss
eval_loss += loss.detach().float()
- first_key = next(iter(batch))
- eval_dataset_len+= len(batch[first_key])
# Decode predictions and add to evaluation predictions list
preds = torch.argmax(outputs.logits, -1)
@@ -223,11 +252,18 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
# Compute average loss and perplexity
- eval_epoch_loss = eval_loss / eval_dataset_len
+ eval_epoch_loss = eval_loss / len(eval_dataloader)
+ if train_config.enable_fsdp:
+ eval_epoch_loss = eval_epoch_loss/world_size
eval_ppl = torch.exp(eval_epoch_loss)
# Print evaluation metrics
- print(f" {eval_ppl=} {eval_epoch_loss=}")
+ if train_config.enable_fsdp:
+ if local_rank==0:
+ print(f" {eval_ppl=} {eval_epoch_loss=}")
+ else:
+ print(f" {eval_ppl=} {eval_epoch_loss=}")
return eval_ppl, eval_epoch_loss
def freeze_transformer_layers(model, num_layer):
@@ -252,7 +288,10 @@ def setup_environ_flags(rank):
"""Set environment flags for debugging purposes"""
os.environ["TORCH_SHOW_CPP_STACKTRACES"] = str(1)
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = str(1)
+ # This flag will help with CUDA memory fragmentations that can lead into OOM in some cases.
+ # Note this is only availble in PyTorch Nighlies (as of July 30 2023)
+ # os.environ['PYTORCH_CUDA_ALLOC_CONF']='expandable_segments:True'
if rank == 0:
print(f"--> Running with torch dist debug set to detail")