GPU & CUDA Fundamentals
Understand why GPUs dominate AI workloads, how CUDA enables parallel programming, and the NVIDIA software layers that maximize performance.
Why GPUs for AI?
A GPU has thousands of small cores (e.g., 16,896 on H100). Deep learning is mostly matrix multiplication β perfectly parallel. A GPU runs 16,000+ threads simultaneously vs a CPU's ~64.
CUDA Threading Model
CUDA organizes threads into blocks, and blocks into grids. Each block shares fast shared memory. The key is launching enough threads to saturate all GPU cores.
GPU Memory Hierarchy
HBM3e DRAM (3.5 TB/s) β L2 Cache (50 MB, ~10 TB/s) β Shared Memory (228 GB/s per SM) β Registers. Moving data is the bottleneck β not compute.
cuDNN, cuBLAS, NCCL
cuDNN provides GPU-accelerated convolutions and activations. cuBLAS handles matrix multiplication. NCCL enables AllReduce across multiple GPUs for distributed training.
π Python: First CUDA Program with CuPy
pip install cupy-cuda12x β CuPy is NumPy on GPU. Every operation runs as a CUDA kernel automatically.import cupy as cp import numpy as np import time # Create large matrices directly on GPU (no CPU β GPU copy needed) N = 8192 A_gpu = cp.random.randn(N, N, dtype=cp.float32) B_gpu = cp.random.randn(N, N, dtype=cp.float32) # Warm-up GPU _ = cp.matmul(A_gpu, B_gpu) cp.cuda.Device().synchronize() # Benchmark GPU matrix multiply (the core of every neural network layer) start = time.perf_counter() for _ in range(10): C_gpu = cp.matmul(A_gpu, B_gpu) cp.cuda.Device().synchronize() gpu_time = (time.perf_counter() - start) / 10 # Same on CPU for comparison A_cpu = cp.asnumpy(A_gpu) B_cpu = cp.asnumpy(B_gpu) start = time.perf_counter() C_cpu = np.matmul(A_cpu, B_cpu) cpu_time = time.perf_counter() - start print(f"GPU time: {gpu_time*1000:.1f} ms") print(f"CPU time: {cpu_time*1000:.1f} ms") print(f"Speedup: {cpu_time/gpu_time:.0f}Γ") print(f"GPU: {cp.cuda.Device(0).compute_capability}") print(f"Free VRAM: {cp.cuda.Device(0).mem_info[0]/1e9:.1f} GB")
π§ Install & Verify CUDA Stack
# Verify CUDA installation nvidia-smi # GPU driver status nvcc --version # CUDA compiler version # Check GPU details nvidia-smi --query-gpu=name,memory.total,compute_cap \ --format=csv # Install Python GPU stack pip install cupy-cuda12x torch torchvision \ --index-url https://download.pytorch.org/whl/cu121 # Verify PyTorch sees GPU python -c "import torch; print(torch.cuda.get_device_name(0))"
π RAPIDS: GPU Data Science
import cudf # GPU DataFrame (like pandas) import cuml # GPU ML (like scikit-learn) import numpy as np # Create GPU DataFrame β 100Γ faster than pandas for large data df = cudf.DataFrame({ 'vehicle_speed': np.random.normal(60, 15, 1_000_000), 'vehicle_count': np.random.poisson(20, 1_000_000), 'congestion_idx': np.random.uniform(0, 1, 1_000_000) }) # GPU-accelerated aggregation β 1M rows in milliseconds summary = df.groupby(df['congestion_idx'] > 0.7).agg({ 'vehicle_speed': ['mean', 'std'], 'vehicle_count': 'sum' }) # GPU K-Means clustering (anomaly detection use case) from cuml.cluster import KMeans X = df[['vehicle_speed', 'congestion_idx']].values kmeans = KMeans(n_clusters=3, random_state=42) labels = kmeans.fit_predict(X) print(f"Clustered 1M traffic records into {kmeans.n_clusters} segments")
Deep Learning on GPU
Train neural networks end-to-end using PyTorch on NVIDIA GPUs. Covers CNNs for vision, Transformers for language, and distributed multi-GPU training.
ποΈ Training an Image Classifier (PyTorch + CUDA)
import torch import torch.nn as nn import torchvision.models as models from torch.cuda.amp import GradScaler, autocast # ββ Device Setup ββββββββββββββββββββββββββββββββββββββββ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Training on: {torch.cuda.get_device_name(0)}") # ββ Model: ResNet-50 fine-tuned for 10-class traffic classification model = models.resnet50(pretrained=True) model.fc = nn.Linear(2048, 10) # Replace final layer model = model.to(device) # ββ Mixed Precision (AMP) β 2Γ faster, 50% less VRAM βββββ scaler = GradScaler() optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) criterion = nn.CrossEntropyLoss() def train_epoch(loader): model.train() total_loss = 0 for images, labels in loader: images, labels = images.to(device), labels.to(device) optimizer.zero_grad() # autocast: automatically uses FP16 where safe with autocast(): outputs = model(images) loss = criterion(outputs, labels) # scaler prevents FP16 underflow scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() total_loss += loss.item() return total_loss / len(loader) # ββ Multi-GPU with DataParallel βββββββββββββββββββββββββββ if torch.cuda.device_count() > 1: print(f"Using {torch.cuda.device_count()} GPUs") model = nn.DataParallel(model) # Simple multi-GPU # For DGX: use torch.distributed (DistributedDataParallel)
π Distributed Training β DGX Multi-GPU (DDP)
# Launch 8-GPU distributed training on DGX H100 # torchrun handles process spawning and NCCL backend torchrun \ --nproc_per_node=8 \ # 8 GPUs per node --nnodes=1 \ # Single DGX node --node_rank=0 \ train_ddp.py \ --batch-size 512 \ --epochs 100 \ --learning-rate 1e-3 # For multi-node (DGX SuperPOD): torchrun \ --nproc_per_node=8 \ --nnodes=4 \ # 4 DGX nodes = 32 GPUs --node_rank=0 \ --master_addr="dgx-node-0" \ --master_port=29500 \ train_ddp.py
TensorRT Optimization
Convert trained models to TensorRT engines for production inference. Apply quantization and achieve up to 40Γ speedup over CPU inference.
β‘ Export PyTorch β ONNX β TensorRT
import torch import tensorrt as trt import onnx # Step 1: Export PyTorch model to ONNX model = torch.load("yolov8_traffic.pt").cuda().eval() dummy = torch.randn(1, 3, 640, 640, device="cuda") torch.onnx.export( model, dummy, "yolov8_traffic.onnx", opset_version=17, input_names=["images"], output_names=["output0"], dynamic_axes={"images": {0: "batch"}} # Dynamic batch size ) # Step 2: Build TensorRT engine with INT8 quantization logger = trt.Logger(trt.Logger.WARNING) builder = trt.Builder(logger) network = builder.create_network(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) parser = trt.OnnxParser(network, logger) with open("yolov8_traffic.onnx", "rb") as f: parser.parse(f.read()) config = builder.create_builder_config() config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 4 << 30) # 4 GB # Enable FP16 β 2Γ throughput, minimal accuracy loss config.set_flag(trt.BuilderFlag.FP16) # Enable INT8 β 4Γ throughput, requires calibration # config.set_flag(trt.BuilderFlag.INT8) engine_bytes = builder.build_serialized_network(network, config) with open("yolov8_traffic_fp16.trt", "wb") as f: f.write(engine_bytes) print("TensorRT engine built! Deploy this on Jetson AGX Orin or DGX.")
π― Run TensorRT Inference
import tensorrt as trt import numpy as np import pycuda.driver as cuda import pycuda.autoinit # Load engine from disk logger = trt.Logger(trt.Logger.WARNING) runtime = trt.Runtime(logger) with open("yolov8_traffic_fp16.trt", "rb") as f: engine = runtime.deserialize_cuda_engine(f.read()) context = engine.create_execution_context() # Allocate GPU memory for I/O input_shape = (1, 3, 640, 640) output_shape = (1, 84, 8400) # YOLOv8 output d_input = cuda.mem_alloc(np.prod(input_shape) * 4) # FP32 β 4 bytes d_output = cuda.mem_alloc(np.prod(output_shape) * 4) # Inference (typically <3ms on Jetson AGX Orin, <1ms on H100) frame = np.random.randn(*input_shape).astype(np.float32) cuda.memcpy_htod(d_input, frame) context.execute_v2([int(d_input), int(d_output)]) result = np.empty(output_shape, dtype=np.float32) cuda.memcpy_dtoh(result, d_output) print(f"Inference complete. Output shape: {result.shape}")
NIM & Triton Inference Serving
Deploy AI models at production scale using NVIDIA NIM microservices and Triton Inference Server. From docker pull to serving 1,000 requests/second.
π Deploy Llama 3 with NIM in 3 Commands
# Step 1: Authenticate with NGC (NVIDIA GPU Cloud) docker login nvcr.io \ --username='$oauthtoken' \ --password="YOUR_NGC_API_KEY" # Step 2: Pull and run Llama 3 8B NIM # NIM auto-selects TensorRT-LLM or vLLM based on your GPU docker run -d \ --gpus all \ --name llama3-nim \ -p 8000:8000 \ -e NGC_API_KEY="YOUR_NGC_API_KEY" \ -v ~/nim-cache:/opt/nim/.cache \ nvcr.io/nim/meta/llama3-8b-instruct:latest # Step 3: Health check β wait ~60s for model to load curl http://localhost:8000/v1/models # β {"data":[{"id":"meta/llama3-8b-instruct","object":"model",...}]}
π¬ Call NIM API β OpenAI Compatible
from openai import OpenAI import time # NIM is 100% OpenAI API compatible β just change base_url client = OpenAI( base_url="http://localhost:8000/v1", api_key="not-needed-for-local" ) # Streaming chat completion start = time.perf_counter() stream = client.chat.completions.create( model="meta/llama3-8b-instruct", messages=[ {"role": "system", "content": "You are an NVIDIA AI expert."}, {"role": "user", "content": "Explain NVIDIA NIM in 3 sentences."} ], stream=True, max_tokens=512, temperature=0.7 ) full_response = "" tokens = 0 for chunk in stream: delta = chunk.choices[0].delta.content or "" print(delta, end="", flush=True) full_response += delta tokens += 1 elapsed = time.perf_counter() - start print(f"\n\nTokens/sec: {tokens/elapsed:.0f}") print(f"Total latency: {elapsed*1000:.0f}ms")
βΈοΈ Triton Inference Server β Multi-Model Serving
# Create model repository structure mkdir -p model_repo/yolov8/1 mkdir -p model_repo/resnet50/1 # Config for YOLOv8 (TensorRT backend) cat > model_repo/yolov8/config.pbtxt << 'EOF' name: "yolov8" backend: "tensorrt" max_batch_size: 32 input [{ name: "images" data_type: TYPE_FP32 dims: [3, 640, 640] }] output [{ name: "output0" data_type: TYPE_FP32 dims: [84, 8400] }] instance_group [{ count: 2 kind: KIND_GPU }] # 2 concurrent model instances dynamic_batching { max_queue_delay_microseconds: 500 } EOF # Launch Triton with GPU support docker run -d \ --gpus all \ -p 8001:8001 -p 8002:8002 -p 8003:8003 \ -v $(pwd)/model_repo:/models \ nvcr.io/nvidia/tritonserver:24.01-py3 \ tritonserver --model-repository=/models # Check all models loaded curl http://localhost:8000/v2/models/yolov8/ready
NeMo & LLM Fine-Tuning
Train and fine-tune large language models end-to-end using NVIDIA NeMo β from dataset curation to LoRA fine-tuning to deployment as a NIM.
Data Curation (NeMo Curator)
Clean, deduplicate, and quality-filter your training corpus using GPU-accelerated pipelines that process trillions of tokens.
Fine-Tuning with LoRA
Adapt a pretrained Llama 3 model to your domain using Low-Rank Adaptation β only 0.1% of parameters need updating.
Alignment (RLHF / DPO)
Use NeMo Aligner to align the model to human preferences using PPO or Direct Preference Optimization.
Package & Deploy as NIM
Export the fine-tuned model with TensorRT-LLM and wrap it as a deployable NIM container.
π§ LoRA Fine-Tuning with NeMo
# Pull NeMo Framework container from NGC docker run -it --gpus all \ -v $(pwd):/workspace \ nvcr.io/nvidia/nemo:24.01 bash # Inside container: LoRA fine-tuning Llama 3 8B python /opt/NeMo/examples/nlp/language_modeling/tuning/megatron_gpt_peft_tuning.py \ model.restore_from_path="/workspace/Llama-3-8B.nemo" \ model.peft.peft_scheme="lora" \ model.peft.lora_tuning.adapter_dim=32 \ # LoRA rank (4β64) model.peft.lora_tuning.alpha=64 \ # LoRA alpha = 2Γ rank model.data.train_ds.file_names=["/workspace/train.jsonl"] \ model.data.validation_ds.file_names=["/workspace/val.jsonl"] \ trainer.devices=8 \ # 8 GPUs on DGX H100 trainer.max_epochs=3 \ trainer.precision="bf16" \ # BFloat16 on Hopper GPUs model.global_batch_size=128 \ model.micro_batch_size=4 \ exp_manager.exp_dir="/workspace/checkpoints"
π¦ Build a RAG Pipeline with NeMo Retriever
import requests from openai import OpenAI import numpy as np # ββ Embedding with NV-Embed NIM ββββββββββββββββββββββββββ EMBED_URL = "http://localhost:9080/v1/embeddings" NIM_URL = "http://localhost:8000/v1" def embed(texts): """Get GPU-accelerated embeddings from NV-Embed NIM""" r = requests.post(EMBED_URL, json={"input": texts, "model": "nvidia/nv-embed-v1"}) return np.array([d["embedding"] for d in r.json()["data"]]) # ββ Simple Vector Store (use Milvus/pgvector in production) β class VectorStore: def __init__(self): self.docs, self.vecs = [], None def add(self, docs): self.docs = docs self.vecs = embed(docs) # GPU-embedded in parallel def search(self, query, k=3): q = embed([query])[0] scores = np.dot(self.vecs, q) # Cosine similarity top = np.argsort(scores)[::-1][:k] return [self.docs[i] for i in top] # ββ RAG Query Function βββββββββββββββββββββββββββββββββββββ client = OpenAI(base_url=NIM_URL, api_key="not-needed") store = VectorStore() # Index NVIDIA AI Enterprise documentation store.add([ "NVIDIA NIM provides pre-built containers for AI inference...", "NeMo is a framework for building, training, and fine-tuning LLMs...", "Jetson AGX Orin supports up to 275 TOPS at 15-60W power envelope...", ]) def rag_query(question): context = "\n".join(store.search(question)) response = client.chat.completions.create( model="meta/llama3-8b-instruct", messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}] ) return response.choices[0].message.content print(rag_query("What is NVIDIA NIM?"))
Edge-to-Cloud Pipeline
Build complete end-to-end AI systems: Jetson edge inference β Kafka streaming β DGX central AI β NIM serving. Inspired by real public sector deployments.
π¦ Edge: Jetson AGX Orin Video Analytics
import gi, json, time gi.require_version('Gst', '1.0') from gi.repository import Gst from kafka import KafkaProducer import numpy as np # ββ Kafka Producer β send metadata to central cluster ββββ producer = KafkaProducer( bootstrap_servers=['central-ai:9092'], value_serializer=lambda v: json.dumps(v).encode() ) # ββ DeepStream-style pipeline (simplified) βββββββββββββββ # Real DeepStream uses GStreamer + NVDEC + TensorRT in one pipeline def process_frame(frame_id, camera_id, detections): """Called for each video frame after TensorRT inference""" # PII Anonymization: blur faces/plates BEFORE sending anywhere anonymized = [{ 'class': d['class'], 'confidence': d['conf'], 'bbox': d['bbox'] # NOTE: No image data β only metadata leaves the edge node } for d in detections if d['class'] != 'person_face'] # Aggregate stats for traffic monitoring vehicle_count = sum(1 for d in anonymized if d['class'] in ['car','truck','bus']) event = { 'timestamp': time.time(), 'camera_id': camera_id, 'frame_id': frame_id, 'vehicle_count': vehicle_count, 'detections': anonymized, 'inference_ms': np.random.uniform(1.5, 2.8) # ~2ms on Jetson } # Publish to Kafka topic β received by central DGX cluster producer.send('traffic.events', value=event) return event
π¨ Central: Kafka Consumer + Triton AI
from kafka import KafkaConsumer import tritonclient.http as httpclient import numpy as np import json from collections import defaultdict, deque # ββ Triton client for central AI models βββββββββββββββββ triton = httpclient.InferenceServerClient("localhost:8001") consumer = KafkaConsumer( 'traffic.events', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode()) ) # Rolling window per camera for congestion detection windows = defaultdict(lambda: deque(maxlen=30)) # 30 frames = ~1 second def predict_congestion(camera_id, vehicle_counts): """ST-GCN model on Triton: spatial-temporal traffic forecasting""" x = np.array(vehicle_counts, dtype=np.float32).reshape(1, 30, 1) inp = httpclient.InferInput("input", x.shape, "FP32") inp.set_data_from_numpy(x) out = httpclient.InferRequestedOutput("congestion_score") result = triton.infer("traffic_stgcn", [inp], outputs=[out]) score = result.as_numpy("congestion_score")[0][0] return float(score) for msg in consumer: event = msg.value cam = event['camera_id'] windows[cam].append(event['vehicle_count']) if len(windows[cam]) == 30: score = predict_congestion(cam, list(windows[cam])) if score > 0.8: print(f"π¨ HIGH CONGESTION at camera {cam}: score={score:.2f}") # Trigger NIM LLM to generate human-readable incident report
βοΈ AWS Lambda + S3 β Serverless Event Handler
import json, boto3, os, time from openai import OpenAI # NVIDIA NIM endpoint (deployed on EC2 G5 with H100) NIM_ENDPOINT = os.environ['NIM_ENDPOINT'] # e.g. https://nim.your-domain.com s3 = boto3.client('s3') BUCKET = 'nvidiaviswanext' nim_client = OpenAI(base_url=f"{NIM_ENDPOINT}/v1", api_key="not-needed") def lambda_handler(event, context): """ Triggered by API Gateway POST /analyze-traffic Body: { camera_id, vehicle_count, congestion_score, timestamp } """ body = json.loads(event['body']) cam = body['camera_id'] score = body['congestion_score'] count = body['vehicle_count'] # Generate AI incident report using NIM Llama 3 prompt = (f"Camera {cam} shows congestion score {score:.2f} with " f"{count} vehicles. Write a 2-sentence traffic advisory.") resp = nim_client.chat.completions.create( model="meta/llama3-8b-instruct", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) advisory = resp.choices[0].message.content # Save event + report to S3 (nvidiaviswanext bucket) key = f"events/{cam}/{int(time.time())}.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps({**body, "ai_advisory": advisory}), ContentType='application/json' ) return { 'statusCode': 200, 'headers': {'Access-Control-Allow-Origin': '*'}, 'body': json.dumps({ 'advisory': advisory, 's3_key': key, 'processing_ms': context.get_remaining_time_in_millis() }) }