AS
AgSkills.dev
MARKETPLACE

vector-index-tuning

Optimize vector index performance for latency, recall, and memory. Use when tuning HNSW parameters, selecting quantization strategies, or scaling vector search infrastructure.

28.3k
3.1k

Preview

SKILL.md
name
vector-index-tuning
description
Optimize vector index performance for latency, recall, and memory. Use when tuning HNSW parameters, selecting quantization strategies, or scaling vector search infrastructure.

Vector Index Tuning

Guide to optimizing vector indexes for production performance.

When to Use This Skill

  • Tuning HNSW parameters
  • Implementing quantization
  • Optimizing memory usage
  • Reducing search latency
  • Balancing recall vs speed
  • Scaling to billions of vectors

Core Concepts

1. Index Type Selection

Data Size           Recommended Index
────────────────────────────────────────
< 10K vectors  β†’    Flat (exact search)
10K - 1M       β†’    HNSW
1M - 100M      β†’    HNSW + Quantization
> 100M         β†’    IVF + PQ or DiskANN

2. HNSW Parameters

ParameterDefaultEffect
M16Connections per node, ↑ = better recall, more memory
efConstruction100Build quality, ↑ = better index, slower build
efSearch50Search quality, ↑ = better recall, slower search

3. Quantization Types

Full Precision (FP32): 4 bytes Γ— dimensions
Half Precision (FP16): 2 bytes Γ— dimensions
INT8 Scalar:           1 byte Γ— dimensions
Product Quantization:  ~32-64 bytes total
Binary:                dimensions/8 bytes

Templates

Template 1: HNSW Parameter Tuning

import numpy as np from typing import List, Tuple import time def benchmark_hnsw_parameters( vectors: np.ndarray, queries: np.ndarray, ground_truth: np.ndarray, m_values: List[int] = [8, 16, 32, 64], ef_construction_values: List[int] = [64, 128, 256], ef_search_values: List[int] = [32, 64, 128, 256] ) -> List[dict]: """Benchmark different HNSW configurations.""" import hnswlib results = [] dim = vectors.shape[1] n = vectors.shape[0] for m in m_values: for ef_construction in ef_construction_values: # Build index index = hnswlib.Index(space='cosine', dim=dim) index.init_index(max_elements=n, M=m, ef_construction=ef_construction) build_start = time.time() index.add_items(vectors) build_time = time.time() - build_start # Get memory usage memory_bytes = index.element_count * ( dim * 4 + # Vector storage m * 2 * 4 # Graph edges (approximate) ) for ef_search in ef_search_values: index.set_ef(ef_search) # Measure search search_start = time.time() labels, distances = index.knn_query(queries, k=10) search_time = time.time() - search_start # Calculate recall recall = calculate_recall(labels, ground_truth, k=10) results.append({ "M": m, "ef_construction": ef_construction, "ef_search": ef_search, "build_time_s": build_time, "search_time_ms": search_time * 1000 / len(queries), "recall@10": recall, "memory_mb": memory_bytes / 1024 / 1024 }) return results def calculate_recall(predictions: np.ndarray, ground_truth: np.ndarray, k: int) -> float: """Calculate recall@k.""" correct = 0 for pred, truth in zip(predictions, ground_truth): correct += len(set(pred[:k]) & set(truth[:k])) return correct / (len(predictions) * k) def recommend_hnsw_params( num_vectors: int, target_recall: float = 0.95, max_latency_ms: float = 10, available_memory_gb: float = 8 ) -> dict: """Recommend HNSW parameters based on requirements.""" # Base recommendations if num_vectors < 100_000: m = 16 ef_construction = 100 elif num_vectors < 1_000_000: m = 32 ef_construction = 200 else: m = 48 ef_construction = 256 # Adjust ef_search based on recall target if target_recall >= 0.99: ef_search = 256 elif target_recall >= 0.95: ef_search = 128 else: ef_search = 64 return { "M": m, "ef_construction": ef_construction, "ef_search": ef_search, "notes": f"Estimated for {num_vectors:,} vectors, {target_recall:.0%} recall" }

Template 2: Quantization Strategies

import numpy as np from typing import Optional class VectorQuantizer: """Quantization strategies for vector compression.""" @staticmethod def scalar_quantize_int8( vectors: np.ndarray, min_val: Optional[float] = None, max_val: Optional[float] = None ) -> Tuple[np.ndarray, dict]: """Scalar quantization to INT8.""" if min_val is None: min_val = vectors.min() if max_val is None: max_val = vectors.max() # Scale to 0-255 range scale = 255.0 / (max_val - min_val) quantized = np.clip( np.round((vectors - min_val) * scale), 0, 255 ).astype(np.uint8) params = {"min_val": min_val, "max_val": max_val, "scale": scale} return quantized, params @staticmethod def dequantize_int8( quantized: np.ndarray, params: dict ) -> np.ndarray: """Dequantize INT8 vectors.""" return quantized.astype(np.float32) / params["scale"] + params["min_val"] @staticmethod def product_quantize( vectors: np.ndarray, n_subvectors: int = 8, n_centroids: int = 256 ) -> Tuple[np.ndarray, dict]: """Product quantization for aggressive compression.""" from sklearn.cluster import KMeans n, dim = vectors.shape assert dim % n_subvectors == 0 subvector_dim = dim // n_subvectors codebooks = [] codes = np.zeros((n, n_subvectors), dtype=np.uint8) for i in range(n_subvectors): start = i * subvector_dim end = (i + 1) * subvector_dim subvectors = vectors[:, start:end] kmeans = KMeans(n_clusters=n_centroids, random_state=42) codes[:, i] = kmeans.fit_predict(subvectors) codebooks.append(kmeans.cluster_centers_) params = { "codebooks": codebooks, "n_subvectors": n_subvectors, "subvector_dim": subvector_dim } return codes, params @staticmethod def binary_quantize(vectors: np.ndarray) -> np.ndarray: """Binary quantization (sign of each dimension).""" # Convert to binary: positive = 1, negative = 0 binary = (vectors > 0).astype(np.uint8) # Pack bits into bytes n, dim = vectors.shape packed_dim = (dim + 7) // 8 packed = np.zeros((n, packed_dim), dtype=np.uint8) for i in range(dim): byte_idx = i // 8 bit_idx = i % 8 packed[:, byte_idx] |= (binary[:, i] << bit_idx) return packed def estimate_memory_usage( num_vectors: int, dimensions: int, quantization: str = "fp32", index_type: str = "hnsw", hnsw_m: int = 16 ) -> dict: """Estimate memory usage for different configurations.""" # Vector storage bytes_per_dimension = { "fp32": 4, "fp16": 2, "int8": 1, "pq": 0.05, # Approximate "binary": 0.125 } vector_bytes = num_vectors * dimensions * bytes_per_dimension[quantization] # Index overhead if index_type == "hnsw": # Each node has ~M*2 edges, each edge is 4 bytes (int32) index_bytes = num_vectors * hnsw_m * 2 * 4 elif index_type == "ivf": # Inverted lists + centroids index_bytes = num_vectors * 8 + 65536 * dimensions * 4 else: index_bytes = 0 total_bytes = vector_bytes + index_bytes return { "vector_storage_mb": vector_bytes / 1024 / 1024, "index_overhead_mb": index_bytes / 1024 / 1024, "total_mb": total_bytes / 1024 / 1024, "total_gb": total_bytes / 1024 / 1024 / 1024 }

Template 3: Qdrant Index Configuration

from qdrant_client import QdrantClient from qdrant_client.http import models def create_optimized_collection( client: QdrantClient, collection_name: str, vector_size: int, num_vectors: int, optimize_for: str = "balanced" # "recall", "speed", "memory" ) -> None: """Create collection with optimized settings.""" # HNSW configuration based on optimization target hnsw_configs = { "recall": models.HnswConfigDiff(m=32, ef_construct=256), "speed": models.HnswConfigDiff(m=16, ef_construct=64), "balanced": models.HnswConfigDiff(m=16, ef_construct=128), "memory": models.HnswConfigDiff(m=8, ef_construct=64) } # Quantization configuration quantization_configs = { "recall": None, # No quantization for max recall "speed": models.ScalarQuantization( scalar=models.ScalarQuantizationConfig( type=models.ScalarType.INT8, quantile=0.99, always_ram=True ) ), "balanced": models.ScalarQuantization( scalar=models.ScalarQuantizationConfig( type=models.ScalarType.INT8, quantile=0.99, always_ram=False ) ), "memory": models.ProductQuantization( product=models.ProductQuantizationConfig( compression=models.CompressionRatio.X16, always_ram=False ) ) } # Optimizer configuration optimizer_configs = { "recall": models.OptimizersConfigDiff( indexing_threshold=10000, memmap_threshold=50000 ), "speed": models.OptimizersConfigDiff( indexing_threshold=5000, memmap_threshold=20000 ), "balanced": models.OptimizersConfigDiff( indexing_threshold=20000, memmap_threshold=50000 ), "memory": models.OptimizersConfigDiff( indexing_threshold=50000, memmap_threshold=10000 # Use disk sooner ) } client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams( size=vector_size, distance=models.Distance.COSINE ), hnsw_config=hnsw_configs[optimize_for], quantization_config=quantization_configs[optimize_for], optimizers_config=optimizer_configs[optimize_for] ) def tune_search_parameters( client: QdrantClient, collection_name: str, target_recall: float = 0.95 ) -> dict: """Tune search parameters for target recall.""" # Search parameter recommendations if target_recall >= 0.99: search_params = models.SearchParams( hnsw_ef=256, exact=False, quantization=models.QuantizationSearchParams( ignore=True, # Don't use quantization for search rescore=True ) ) elif target_recall >= 0.95: search_params = models.SearchParams( hnsw_ef=128, exact=False, quantization=models.QuantizationSearchParams( ignore=False, rescore=True, oversampling=2.0 ) ) else: search_params = models.SearchParams( hnsw_ef=64, exact=False, quantization=models.QuantizationSearchParams( ignore=False, rescore=False ) ) return search_params

Template 4: Performance Monitoring

import time from dataclasses import dataclass from typing import List import numpy as np @dataclass class SearchMetrics: latency_p50_ms: float latency_p95_ms: float latency_p99_ms: float recall: float qps: float class VectorSearchMonitor: """Monitor vector search performance.""" def __init__(self, ground_truth_fn=None): self.latencies = [] self.recalls = [] self.ground_truth_fn = ground_truth_fn def measure_search( self, search_fn, query_vectors: np.ndarray, k: int = 10, num_iterations: int = 100 ) -> SearchMetrics: """Benchmark search performance.""" latencies = [] for _ in range(num_iterations): for query in query_vectors: start = time.perf_counter() results = search_fn(query, k=k) latency = (time.perf_counter() - start) * 1000 latencies.append(latency) latencies = np.array(latencies) total_queries = num_iterations * len(query_vectors) total_time = sum(latencies) / 1000 # seconds return SearchMetrics( latency_p50_ms=np.percentile(latencies, 50), latency_p95_ms=np.percentile(latencies, 95), latency_p99_ms=np.percentile(latencies, 99), recall=self._calculate_recall(search_fn, query_vectors, k) if self.ground_truth_fn else 0, qps=total_queries / total_time ) def _calculate_recall(self, search_fn, queries: np.ndarray, k: int) -> float: """Calculate recall against ground truth.""" if not self.ground_truth_fn: return 0 correct = 0 total = 0 for query in queries: predicted = set(search_fn(query, k=k)) actual = set(self.ground_truth_fn(query, k=k)) correct += len(predicted & actual) total += k return correct / total def profile_index_build( build_fn, vectors: np.ndarray, batch_sizes: List[int] = [1000, 10000, 50000] ) -> dict: """Profile index build performance.""" results = {} for batch_size in batch_sizes: times = [] for i in range(0, len(vectors), batch_size): batch = vectors[i:i + batch_size] start = time.perf_counter() build_fn(batch) times.append(time.perf_counter() - start) results[batch_size] = { "avg_batch_time_s": np.mean(times), "vectors_per_second": batch_size / np.mean(times) } return results

Best Practices

Do's

  • Benchmark with real queries - Synthetic may not represent production
  • Monitor recall continuously - Can degrade with data drift
  • Start with defaults - Tune only when needed
  • Use quantization - Significant memory savings
  • Consider tiered storage - Hot/cold data separation

Don'ts

  • Don't over-optimize early - Profile first
  • Don't ignore build time - Index updates have cost
  • Don't forget reindexing - Plan for maintenance
  • Don't skip warming - Cold indexes are slow

Resources

GitHub Repository
wshobson/agents
Stars
28,364
Forks
3,118
Open Repository
Install Skill
Download ZIP1 files