Vector Operations
Complete guide to vector CRUD operations in Antarys - Create, Read, Update, and Delete vectors with advanced querying capabilities.
Vector Operations
Comprehensive guide to performing Create, Read, Update, and Delete (CRUD) operations on vectors in Antarys.
Getting Started: Make sure you have a collection created and a client initialized before performing vector operations.
Overview
The VectorOperations
class provides high-performance interfaces for all vector operations with built-in optimizations including:
- Automatic dimension validation
- Batch processing with parallel workers
- Client-side caching for queries
- Connection pooling and retry logic
- Memory-efficient processing
📥 Upsert Operations
Insert or update vectors efficiently
🔍 Query Operations
Search and retrieve similar vectors
🗑️ Delete Operations
Remove vectors by ID or criteria
📊 Utility Operations
Count, validate, and monitor vectors
Getting Vector Operations Interface
import asyncio
import antarys
# Initialize client and get vector operations interface
client = await antarys.create_client("http://localhost:8080")
vector_ops = client.vector_operations("my_collection")
# Alternative method
vector_ops = client.get_query_interface("my_collection")
Upsert Operations
Single Vector Upsert
# Insert or update a single vector
await vector_ops.upsert([
{
"id": "vector_1",
"values": [0.1, 0.2, 0.3, 0.4], # Your vector data
"metadata": {"category": "example"}
}
])
# Upsert with rich metadata
await vector_ops.upsert([
{
"id": "doc_123",
"values": [0.1] * 1536, # OpenAI embedding dimensions
"metadata": {
"title": "Machine Learning Basics",
"author": "Data Scientist",
"category": "education",
"tags": ["ml", "ai", "tutorial"],
"created_at": "2025-01-15",
"source_url": "https://example.com/ml-basics"
}
}
])
# Upsert with automatic dimension validation
result = await vector_ops.upsert(
vectors=[
{
"id": "validated_vector",
"values": [0.1] * 512, # Must match collection dimensions
"metadata": {"validated": True}
}
],
validate_dimensions=True # Enable validation
)
print(f"Upserted {result['upserted_count']} vectors")
Batch Upsert Operations
Performance Tip: Use batch operations for inserting large amounts of data to maximize throughput and minimize network overhead.
Prepare Batch Data
# Prepare large batch of vectors
batch_vectors = []
for i in range(10000):
batch_vectors.append({
"id": f"batch_vector_{i}",
"values": np.random.random(512).tolist(),
"metadata": {
"batch_id": i // 1000, # Group by thousands
"created_at": int(time.time()),
"category": f"category_{i % 5}"
}
})
Optimized Batch Upsert
# High-performance batch upsert
result = await vector_ops.upsert(
vectors=batch_vectors,
batch_size=1000, # Optimal batch size
show_progress=True, # Show progress bar
parallel_workers=8, # Parallel processing
validate_dimensions=True # Ensure data quality
)
print(f"Successfully upserted {result['upserted_count']} vectors")
Memory Management: For very large datasets (100k+ vectors), consider processing in chunks to manage memory usage effectively.
Advanced Batch Configuration
# Fine-tuned batch upsert for maximum performance
result = await vector_ops.upsert(
vectors=large_dataset,
batch_size=5000, # Larger batches for network efficiency
parallel_workers=16, # More workers for CPU-bound tasks
validate_dimensions=True,
show_progress=True
)
# Monitor performance
print(f"Upsert rate: {result['upserted_count'] / elapsed_time:.2f} vectors/sec")
Vector Format Requirements
Data Format: Vectors can be provided with either values
or vector
field names for compatibility.
# Both formats are supported
vector_with_values = {
"id": "vec1",
"values": [0.1, 0.2, 0.3], # Standard format
"metadata": {"type": "standard"}
}
vector_with_vector = {
"id": "vec2",
"vector": [0.1, 0.2, 0.3], # Alternative format
"metadata": {"type": "alternative"}
}
# Both work with the same upsert call
await vector_ops.upsert([vector_with_values, vector_with_vector])
Query Operations
Basic Vector Search
# Basic similarity search
results = await vector_ops.query(
vector=[0.1, 0.2, 0.3, 0.4], # Query vector
top_k=5, # Return top 5 matches
include_metadata=True # Include metadata in results
)
# Process results
for match in results["matches"]:
print(f"ID: {match['id']}, Score: {match['score']:.4f}")
if "metadata" in match:
print(f" Metadata: {match['metadata']}")
# Query with metadata filtering
results = await vector_ops.query(
vector=query_vector,
top_k=10,
include_metadata=True,
filter={
"metadata.category": "education", # Filter by category
"metadata.author": "Data Scientist" # Multiple filters
},
threshold=0.7 # Only results above 70% similarity
)
print(f"Found {len(results['matches'])} filtered matches")
# Advanced query with HNSW parameters
results = await vector_ops.query(
vector=query_vector,
top_k=20,
include_values=False, # Exclude vectors for faster response
include_metadata=True,
use_ann=True, # Use approximate nearest neighbors
ef_search=200, # Higher accuracy (vs speed)
threshold=0.5, # Similarity threshold
validate_dimensions=True, # Validate query vector
skip_cache=False # Use cache if available
)
# Results include similarity scores
for match in results["matches"]:
similarity = match["score"]
print(f"Vector {match['id']}: {similarity:.3f} similarity")
Batch Query Operations
Batch Queries: Process multiple query vectors in parallel for maximum efficiency.
# Prepare multiple query vectors
query_vectors = [
[0.1] * 512, # Query 1
[0.2] * 512, # Query 2
[0.3] * 512, # Query 3
]
# Batch query for parallel processing
batch_results = await vector_ops.batch_query(
vectors=query_vectors,
top_k=5,
include_metadata=True,
validate_dimensions=True
)
# Process batch results
for i, result in enumerate(batch_results["results"]):
print(f"\nQuery {i + 1} results:")
for match in result["matches"]:
print(f" {match['id']}: {match['score']:.3f}")
Query Performance Optimization
Approximate Nearest Neighbors
results = await vector_ops.query(
vector=query_vector,
use_ann=True, # Enable HNSW
ef_search=200 # Quality vs speed
)
Client-side Result Caching
# First query - cache miss
results1 = await vector_ops.query(vector, top_k=5)
# Second identical query - cache hit
results2 = await vector_ops.query(vector, top_k=5)
# Check cache performance
stats = vector_ops.get_cache_stats()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")
Metadata Filtering
# Efficient pre-filtering
results = await vector_ops.query(
vector=query_vector,
filter={"metadata.category": "active"},
top_k=10
)
Query Result Format
# Query results structure
{
"matches": [
{
"id": "vector_123",
"score": 0.8567, # Similarity score
"values": [0.1, 0.2, ...], # Optional: vector values
"metadata": { # Optional: metadata
"title": "Document Title",
"category": "education"
}
}
]
}
Delete Operations
Delete by IDs
# Delete a single vector
result = await vector_ops.delete(["vector_123"])
print(f"Deleted: {result.get('deleted', [])}")
print(f"Failed: {result.get('failed', [])}")
# Delete multiple vectors
vector_ids = [f"vector_{i}" for i in range(100, 200)]
result = await vector_ops.delete(vector_ids)
print(f"Successfully deleted {len(result.get('deleted', []))} vectors")
if result.get('failed'):
print(f"Failed to delete {len(result['failed'])} vectors")
# Delete vectors based on query results
# First, find vectors to delete
results = await vector_ops.query(
vector=reference_vector,
filter={"metadata.status": "deprecated"},
top_k=1000 # Get up to 1000 deprecated vectors
)
# Extract IDs and delete
ids_to_delete = [match["id"] for match in results["matches"]]
if ids_to_delete:
delete_result = await vector_ops.delete(ids_to_delete)
print(f"Deleted {len(delete_result.get('deleted', []))} deprecated vectors")
Delete Performance
Cache Invalidation: Deleting vectors automatically invalidates relevant cache entries to maintain consistency.
# Efficient bulk deletion
large_delete_batch = [f"temp_vector_{i}" for i in range(10000)]
# Monitor deletion performance
start_time = time.time()
result = await vector_ops.delete(large_delete_batch)
end_time = time.time()
delete_rate = len(result.get('deleted', [])) / (end_time - start_time)
print(f"Deletion rate: {delete_rate:.2f} vectors/sec")
Utility Operations
Vector Retrieval
# Get specific vector by ID
vector_data = await vector_ops.get_vector("vector_123")
if vector_data:
print(f"Vector ID: {vector_data['id']}")
print(f"Vector values: {vector_data['vector'][:5]}...") # First 5 values
print(f"Metadata: {vector_data.get('metadata', {})}")
else:
print("Vector not found")
Collection Statistics
# Get vector count
total_vectors = await vector_ops.count_vectors()
print(f"Total vectors in collection: {total_vectors}")
# Get collection dimensions
dimensions = await vector_ops.get_collection_dimensions()
print(f"Collection dimensions: {dimensions}")
Dimension Validation
Validate Single Vector
# Validate vector dimensions
test_vector = [0.1] * 512
is_valid = await vector_ops.validate_vector_dimensions(test_vector)
if is_valid:
print("Vector dimensions are correct")
else:
expected = await vector_ops.get_collection_dimensions()
print(f"Invalid dimensions. Expected: {expected}, Got: {len(test_vector)}")
Batch Validation
# Validate batch of vectors before upsert
vectors_to_validate = [
{"id": "v1", "values": [0.1] * 512},
{"id": "v2", "values": [0.2] * 512},
{"id": "v3", "values": [0.3] * 256}, # Wrong dimensions
]
try:
await vector_ops.upsert(
vectors_to_validate,
validate_dimensions=True # Will catch dimension errors
)
except ValueError as e:
print(f"Validation error: {e}")
Cache Management
# Get cache performance statistics
cache_stats = vector_ops.get_cache_stats()
if cache_stats["cache_enabled"]:
print(f"Cache hit rate: {cache_stats['hit_rate']:.2%}")
print(f"Cache size: {cache_stats['cache_size']} entries")
print(f"Total hits: {cache_stats['cache_hits']}")
print(f"Total misses: {cache_stats['cache_misses']}")
else:
print("Caching is disabled")
# Clear cache if needed
await vector_ops.clear_cache()
print("Cache cleared")
Advanced Patterns
Streaming Upsert
For very large datasets, implement streaming upsert:
async def stream_upsert(vector_ops, data_generator, batch_size=1000):
"""Stream large datasets into Antarys"""
batch = []
total_processed = 0
async for vector_data in data_generator():
batch.append(vector_data)
if len(batch) >= batch_size:
result = await vector_ops.upsert(
batch,
validate_dimensions=True,
show_progress=True
)
total_processed += result["upserted_count"]
batch = [] # Reset batch
# Optional: yield control to prevent blocking
await asyncio.sleep(0.01)
# Process final batch
if batch:
result = await vector_ops.upsert(batch, validate_dimensions=True)
total_processed += result["upserted_count"]
return total_processed
# Usage
async def data_generator():
# Your data source here
for i in range(100000):
yield {
"id": f"stream_vector_{i}",
"values": np.random.random(512).tolist(),
"metadata": {"batch": i // 1000}
}
total = await stream_upsert(vector_ops, data_generator)
print(f"Streamed {total} vectors")
Similarity Search with Re-ranking
async def hybrid_search(vector_ops, query_vector, text_query, top_k=10):
"""Combine vector similarity with text matching"""
# First stage: Vector similarity search
initial_results = await vector_ops.query(
vector=query_vector,
top_k=top_k * 3, # Get more candidates
include_metadata=True,
use_ann=True
)
# Second stage: Re-rank based on text similarity
candidates = []
for match in initial_results["matches"]:
text_score = calculate_text_similarity(
text_query,
match["metadata"].get("text", "")
)
# Combine scores
combined_score = 0.7 * match["score"] + 0.3 * text_score
candidates.append({
**match,
"combined_score": combined_score
})
# Sort by combined score and return top_k
candidates.sort(key=lambda x: x["combined_score"], reverse=True)
return {"matches": candidates[:top_k]}
def calculate_text_similarity(query, text):
"""Simple text similarity calculation"""
# Implement your text similarity logic here
# This is a placeholder
return 0.5
Error Handling and Retry Logic
import asyncio
from typing import List, Dict, Any
async def robust_upsert(vector_ops, vectors: List[Dict[str, Any]],
max_retries=3, backoff_factor=2):
"""Upsert with robust error handling"""
for attempt in range(max_retries):
try:
result = await vector_ops.upsert(
vectors,
validate_dimensions=True,
show_progress=True
)
return result
except ValueError as e:
# Dimension validation errors - don't retry
print(f"Validation error: {e}")
raise
except Exception as e:
if attempt < max_retries - 1:
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print(f"All {max_retries} attempts failed")
raise
# Usage
try:
result = await robust_upsert(vector_ops, my_vectors)
print(f"Successfully upserted {result['upserted_count']} vectors")
except Exception as e:
print(f"Upsert failed permanently: {e}")
Performance Best Practices
🚀 Batch Operations
Optimize Throughput
- Use batch sizes of 1000-5000 for optimal performance
- Enable parallel workers (4-16 depending on CPU)
- Process large datasets in chunks to manage memory
💾 Memory Management
Efficient Resource Usage
- Convert NumPy arrays to lists for JSON serialization
- Use streaming upsert for very large datasets
- Monitor memory usage during batch operations
🔍 Query Optimization
Fast Search Performance
- Enable HNSW indexing with
use_ann=True
- Adjust
ef_search
parameter for accuracy vs speed - Use metadata filtering to reduce search space
- Leverage client-side caching for repeated queries
🛡️ Error Handling
Robust Operations
- Always enable dimension validation for data quality
- Implement retry logic with exponential backoff
- Handle network errors gracefully
- Monitor cache hit rates and performance metrics
Resource Cleanup
Always properly clean up resources:
# Clean up vector operations
await vector_ops.clear_cache()
await vector_ops.close()
# Clean up client
await client.close()
Best Practice: Use context managers or try/finally blocks to ensure proper cleanup even if errors occur.