123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- import csv
- import json
- import time
- import random
- import threading
- import numpy as np
- import requests
- import transformers
- import torch
- from azure.ai.contentsafety import ContentSafetyClient
- from azure.core.credentials import AzureKeyCredential
- from azure.core.exceptions import HttpResponseError
- from azure.ai.contentsafety.models import AnalyzeTextOptions
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Dict, Tuple, List
- with open('input.jsonl') as input:
- prompt_data = json.load(input)
- with open('parameters.json') as parameters:
- params = json.load(parameters)
- MAX_NEW_TOKENS = params["MAX_NEW_TOKENS"]
- CONCURRENT_LEVELS = params["CONCURRENT_LEVELS"]
- MODEL_PATH = params["MODEL_PATH"]
- MODEL_HEADERS = params["MODEL_HEADERS"]
- SAFE_CHECK = params["SAFE_CHECK"]
- THRESHOLD_TPS = params["THRESHOLD_TPS"]
- TOKENIZER_PATH = params["TOKENIZER_PATH"]
- RANDOM_PROMPT_LENGTH = params["RANDOM_PROMPT_LENGTH"]
- TEMPERATURE = params["TEMPERATURE"]
- TOP_P = params["TOP_P"]
- MODEL_ENDPOINTS = params["MODEL_ENDPOINTS"]
- if torch.cuda.is_available():
- NUM_GPU = torch.cuda.device_count()
- else:
- print("No available GPUs")
- tokenizer = transformers.AutoTokenizer.from_pretrained(TOKENIZER_PATH)
- vocab = [token for token in tokenizer.get_vocab().keys() if len(token) > 2 and all(ord(c) < 128 for c in token)]
- def generate_random_prompt(num_tokens):
- generated_tokens_count = 0
- selected_tokens = ""
- while generated_tokens_count < num_tokens:
- selected_tokens += random.choice(vocab)
- selected_tokens += " "
- generated_tokens_count = len(tokenizer.encode(selected_tokens))
- return selected_tokens
- PROMPT = generate_random_prompt(RANDOM_PROMPT_LENGTH)
- num_token_input_prompt = len(tokenizer.encode(PROMPT))
- print(f"Number of token for input prompt: {num_token_input_prompt}")
- def analyze_prompt(input):
- start_time = time.time()
-
- key = ""
- endpoint = ""
-
- client = ContentSafetyClient(endpoint, AzureKeyCredential(key))
-
- request = AnalyzeTextOptions(text=input)
-
- try:
- response = client.analyze_text(request)
- except HttpResponseError as e:
- print("prompt failed due to content safety filtering.")
- if e.error:
- print(f"Error code: {e.error.code}")
- print(f"Error message: {e.error.message}")
- raise
- print(e)
- raise
- analyze_end_time = time.time()
-
- analyze_latency = (analyze_end_time - start_time) * 1000
- executor_id = 0
- lock = threading.Lock()
- def generate_text() -> Tuple[int, int]:
- headers = MODEL_HEADERS
- payload = {
- "model" : MODEL_PATH,
- "messages" : [
- {
- "role": "user",
- "content": PROMPT
- }
- ],
- "stream" : False,
- "temperature" : TEMPERATURE,
- "top_p" : TOP_P,
- "max_tokens" : MAX_NEW_TOKENS
- }
- start_time = time.time()
- if(SAFE_CHECK):
-
-
- analyze_prompt(PROMPT)
-
- lock.acquire()
- global executor_id
- if executor_id != len(MODEL_ENDPOINTS)-1:
- executor_id += 1
- endpoint_id = executor_id
- else:
- executor_id = 0
- endpoint_id = executor_id
- lock.release()
- response = requests.post(MODEL_ENDPOINTS[endpoint_id], headers=headers, json=payload)
- if(SAFE_CHECK):
-
-
- analyze_prompt(PROMPT)
-
- end_time = time.time()
-
- latency = (end_time - start_time) * 1000
- if response.status_code != 200:
- raise ValueError(f"Error: {response.content}")
- output = json.loads(response.content)["choices"][0]["message"]["content"]
- token_count = len(tokenizer.encode(output))
- return latency, token_count
- def evaluate_performance(concurrent_requests: int) -> Tuple[float, float, float, float, float, float, float, List[float]]:
- latencies = []
- total_output_tokens = 0
- output_tokens_per_second_each_request = []
- start_time = time.time()
-
- with ThreadPoolExecutor(max_workers=concurrent_requests) as executor:
- future_to_req = {executor.submit(generate_text): i for i in range(concurrent_requests)}
- for future in as_completed(future_to_req):
- latency, token_count = future.result()
- latencies.append(latency)
- total_output_tokens += token_count
-
- tokens_per_sec = token_count / (latency / 1000)
- output_tokens_per_second_each_request.append(tokens_per_sec)
- end_time = time.time()
- total_time = end_time - start_time
-
- rps = concurrent_requests / total_time
-
- output_tokens_per_second_overall = total_output_tokens / total_time
- input_tokens_per_second_overall = (num_token_input_prompt * concurrent_requests) / total_time
- output_tokens_per_second_per_gpu = output_tokens_per_second_overall / NUM_GPU
- input_tokens_per_second_per_gpu = input_tokens_per_second_overall / NUM_GPU
- p50_latency = np.percentile(latencies, 50)
- p99_latency = np.percentile(latencies, 99)
-
- below_threshold_count = sum(1 for tps in output_tokens_per_second_each_request if tps < THRESHOLD_TPS)
- output_tokens_per_second_per_request = sum(output_tokens_per_second_each_request)/len(output_tokens_per_second_each_request)
- return p50_latency, p99_latency, rps, output_tokens_per_second_overall, output_tokens_per_second_per_gpu, input_tokens_per_second_overall, input_tokens_per_second_per_gpu, output_tokens_per_second_per_request, below_threshold_count
- print("| Number of Concurrent Requests | P50 Latency (ms) | P99 Latency (ms) | RPS | Output Tokens per Second | Output Tokens per Second per GPU | Input Tokens per Second | Input Tokens per Second per GPU |Average Output Tokens per Second per Request | Number of Requests Below Threshold |")
- print("|-------------------------------|------------------|------------------|------------------|-------------------|---------------------------|---------------------|------------------------|-------------------------------------- | ---------------------------------- |")
- csv_file = "performance_metrics.csv"
- with open(csv_file, "w", newline='') as f:
- writer = csv.writer(f)
- writer.writerow(["Number of Concurrent Requests", "P50 Latency (ms)", "P99 Latency (ms)", "RPS", "Output Tokens per Second", "Output Tokens per Second per GPU", "Input Tokens per Second", "Input Tokens per Second per GPU", "Average Output Tokens per Second per Request"])
- for level in CONCURRENT_LEVELS:
- p50_latency, p99_latency, rps, output_tokens_per_second_overall, output_tokens_per_second_per_gpu, input_tokens_per_second_overall, input_tokens_per_second_per_gpu, output_tokens_per_second_per_request, below_threshold_count = evaluate_performance(level)
- print(f"| {level} | {p50_latency:.2f} | {p99_latency:.2f} | {rps:.2f} | {output_tokens_per_second_overall:.2f} | {output_tokens_per_second_per_gpu:.2f} | {input_tokens_per_second_overall:.2f} | {input_tokens_per_second_per_gpu:.2f} | {output_tokens_per_second_per_request:.2f} | {below_threshold_count:.2f} |")
- writer.writerow([level, round(p50_latency, 2), round(p99_latency, 2), round(rps, 2), round(output_tokens_per_second_overall, 2), round(output_tokens_per_second_per_gpu, 2), round(input_tokens_per_second_overall, 2), round(input_tokens_per_second_per_gpu, 2), round(output_tokens_per_second_per_request, 2)])
|