Antarys

|

Antarys

Python Client

Vector Operations

Complete guide to vector CRUD operations in Antarys - Create, Read, Update, and Delete vectors with advanced querying capabilities.

Vector Operations

Comprehensive guide to performing Create, Read, Update, and Delete (CRUD) operations on vectors in Antarys.

Getting Started: Make sure you have a collection created and a client initialized before performing vector operations.

Overview

The VectorOperations class provides high-performance interfaces for all vector operations with built-in optimizations including:

  • Automatic dimension validation
  • Batch processing with parallel workers
  • Client-side caching for queries
  • Connection pooling and retry logic
  • Memory-efficient processing

Getting Vector Operations Interface

import asyncio
import antarys

# Initialize client and get vector operations interface
client = await antarys.create_client("http://localhost:8080")
vector_ops = client.vector_operations("my_collection")

# Alternative method
vector_ops = client.get_query_interface("my_collection")

Upsert Operations

Single Vector Upsert

# Insert or update a single vector
await vector_ops.upsert([
    {
        "id": "vector_1",
        "values": [0.1, 0.2, 0.3, 0.4],  # Your vector data
        "metadata": {"category": "example"}
    }
])
# Upsert with rich metadata
await vector_ops.upsert([
    {
        "id": "doc_123",
        "values": [0.1] * 1536,  # OpenAI embedding dimensions
        "metadata": {
            "title": "Machine Learning Basics",
            "author": "Data Scientist",
            "category": "education",
            "tags": ["ml", "ai", "tutorial"],
            "created_at": "2025-01-15",
            "source_url": "https://example.com/ml-basics"
        }
    }
])
# Upsert with automatic dimension validation
result = await vector_ops.upsert(
    vectors=[
        {
            "id": "validated_vector",
            "values": [0.1] * 512,  # Must match collection dimensions
            "metadata": {"validated": True}
        }
    ],
    validate_dimensions=True  # Enable validation
)

print(f"Upserted {result['upserted_count']} vectors")

Batch Upsert Operations

Performance Tip: Use batch operations for inserting large amounts of data to maximize throughput and minimize network overhead.

Prepare Batch Data

# Prepare large batch of vectors
batch_vectors = []
for i in range(10000):
    batch_vectors.append({
        "id": f"batch_vector_{i}",
        "values": np.random.random(512).tolist(),
        "metadata": {
            "batch_id": i // 1000,  # Group by thousands
            "created_at": int(time.time()),
            "category": f"category_{i % 5}"
        }
    })

Optimized Batch Upsert

# High-performance batch upsert
result = await vector_ops.upsert(
    vectors=batch_vectors,
    batch_size=1000,          # Optimal batch size
    show_progress=True,       # Show progress bar
    parallel_workers=8,       # Parallel processing
    validate_dimensions=True  # Ensure data quality
)

print(f"Successfully upserted {result['upserted_count']} vectors")

Memory Management: For very large datasets (100k+ vectors), consider processing in chunks to manage memory usage effectively.

Advanced Batch Configuration

# Fine-tuned batch upsert for maximum performance
result = await vector_ops.upsert(
    vectors=large_dataset,
    batch_size=5000,          # Larger batches for network efficiency
    parallel_workers=16,      # More workers for CPU-bound tasks
    validate_dimensions=True,
    show_progress=True
)

# Monitor performance
print(f"Upsert rate: {result['upserted_count'] / elapsed_time:.2f} vectors/sec")

Vector Format Requirements

Data Format: Vectors can be provided with either values or vector field names for compatibility.

# Both formats are supported
vector_with_values = {
    "id": "vec1",
    "values": [0.1, 0.2, 0.3],  # Standard format
    "metadata": {"type": "standard"}
}

vector_with_vector = {
    "id": "vec2", 
    "vector": [0.1, 0.2, 0.3],  # Alternative format
    "metadata": {"type": "alternative"}
}

# Both work with the same upsert call
await vector_ops.upsert([vector_with_values, vector_with_vector])

Query Operations

# Basic similarity search
results = await vector_ops.query(
    vector=[0.1, 0.2, 0.3, 0.4],  # Query vector
    top_k=5,                      # Return top 5 matches
    include_metadata=True         # Include metadata in results
)

# Process results
for match in results["matches"]:
    print(f"ID: {match['id']}, Score: {match['score']:.4f}")
    if "metadata" in match:
        print(f"  Metadata: {match['metadata']}")
# Query with metadata filtering
results = await vector_ops.query(
    vector=query_vector,
    top_k=10,
    include_metadata=True,
    filter={
        "metadata.category": "education",  # Filter by category
        "metadata.author": "Data Scientist"  # Multiple filters
    },
    threshold=0.7  # Only results above 70% similarity
)

print(f"Found {len(results['matches'])} filtered matches")
# Advanced query with HNSW parameters
results = await vector_ops.query(
    vector=query_vector,
    top_k=20,
    include_values=False,     # Exclude vectors for faster response
    include_metadata=True,
    use_ann=True,            # Use approximate nearest neighbors
    ef_search=200,           # Higher accuracy (vs speed)
    threshold=0.5,           # Similarity threshold
    validate_dimensions=True, # Validate query vector
    skip_cache=False         # Use cache if available
)

# Results include similarity scores
for match in results["matches"]:
    similarity = match["score"]
    print(f"Vector {match['id']}: {similarity:.3f} similarity")

Batch Query Operations

Batch Queries: Process multiple query vectors in parallel for maximum efficiency.

# Prepare multiple query vectors
query_vectors = [
    [0.1] * 512,  # Query 1
    [0.2] * 512,  # Query 2
    [0.3] * 512,  # Query 3
]

# Batch query for parallel processing
batch_results = await vector_ops.batch_query(
    vectors=query_vectors,
    top_k=5,
    include_metadata=True,
    validate_dimensions=True
)

# Process batch results
for i, result in enumerate(batch_results["results"]):
    print(f"\nQuery {i + 1} results:")
    for match in result["matches"]:
        print(f"  {match['id']}: {match['score']:.3f}")

Query Performance Optimization

Approximate Nearest Neighbors

results = await vector_ops.query(
    vector=query_vector,
    use_ann=True,        # Enable HNSW
    ef_search=200        # Quality vs speed
)

Client-side Result Caching

# First query - cache miss
results1 = await vector_ops.query(vector, top_k=5)

# Second identical query - cache hit
results2 = await vector_ops.query(vector, top_k=5)

# Check cache performance
stats = vector_ops.get_cache_stats()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")

Metadata Filtering

# Efficient pre-filtering
results = await vector_ops.query(
    vector=query_vector,
    filter={"metadata.category": "active"},
    top_k=10
)

Query Result Format

# Query results structure
{
    "matches": [
        {
            "id": "vector_123",
            "score": 0.8567,                    # Similarity score
            "values": [0.1, 0.2, ...],          # Optional: vector values
            "metadata": {                       # Optional: metadata
                "title": "Document Title",
                "category": "education"
            }
        }
    ]
}

Delete Operations

Delete by IDs

# Delete a single vector
result = await vector_ops.delete(["vector_123"])

print(f"Deleted: {result.get('deleted', [])}")
print(f"Failed: {result.get('failed', [])}")
# Delete multiple vectors
vector_ids = [f"vector_{i}" for i in range(100, 200)]

result = await vector_ops.delete(vector_ids)

print(f"Successfully deleted {len(result.get('deleted', []))} vectors")
if result.get('failed'):
    print(f"Failed to delete {len(result['failed'])} vectors")
# Delete vectors based on query results
# First, find vectors to delete
results = await vector_ops.query(
    vector=reference_vector,
    filter={"metadata.status": "deprecated"},
    top_k=1000  # Get up to 1000 deprecated vectors
)

# Extract IDs and delete
ids_to_delete = [match["id"] for match in results["matches"]]
if ids_to_delete:
    delete_result = await vector_ops.delete(ids_to_delete)
    print(f"Deleted {len(delete_result.get('deleted', []))} deprecated vectors")

Delete Performance

Cache Invalidation: Deleting vectors automatically invalidates relevant cache entries to maintain consistency.

# Efficient bulk deletion
large_delete_batch = [f"temp_vector_{i}" for i in range(10000)]

# Monitor deletion performance
start_time = time.time()
result = await vector_ops.delete(large_delete_batch)
end_time = time.time()

delete_rate = len(result.get('deleted', [])) / (end_time - start_time)
print(f"Deletion rate: {delete_rate:.2f} vectors/sec")

Utility Operations

Vector Retrieval

# Get specific vector by ID
vector_data = await vector_ops.get_vector("vector_123")

if vector_data:
    print(f"Vector ID: {vector_data['id']}")
    print(f"Vector values: {vector_data['vector'][:5]}...")  # First 5 values
    print(f"Metadata: {vector_data.get('metadata', {})}")
else:
    print("Vector not found")

Collection Statistics

# Get vector count
total_vectors = await vector_ops.count_vectors()
print(f"Total vectors in collection: {total_vectors}")

# Get collection dimensions
dimensions = await vector_ops.get_collection_dimensions()
print(f"Collection dimensions: {dimensions}")

Dimension Validation

Validate Single Vector

# Validate vector dimensions
test_vector = [0.1] * 512
is_valid = await vector_ops.validate_vector_dimensions(test_vector)

if is_valid:
    print("Vector dimensions are correct")
else:
    expected = await vector_ops.get_collection_dimensions()
    print(f"Invalid dimensions. Expected: {expected}, Got: {len(test_vector)}")

Batch Validation

# Validate batch of vectors before upsert
vectors_to_validate = [
    {"id": "v1", "values": [0.1] * 512},
    {"id": "v2", "values": [0.2] * 512},
    {"id": "v3", "values": [0.3] * 256},  # Wrong dimensions
]

try:
    await vector_ops.upsert(
        vectors_to_validate,
        validate_dimensions=True  # Will catch dimension errors
    )
except ValueError as e:
    print(f"Validation error: {e}")

Cache Management

# Get cache performance statistics
cache_stats = vector_ops.get_cache_stats()

if cache_stats["cache_enabled"]:
    print(f"Cache hit rate: {cache_stats['hit_rate']:.2%}")
    print(f"Cache size: {cache_stats['cache_size']} entries")
    print(f"Total hits: {cache_stats['cache_hits']}")
    print(f"Total misses: {cache_stats['cache_misses']}")
else:
    print("Caching is disabled")

# Clear cache if needed
await vector_ops.clear_cache()
print("Cache cleared")

Advanced Patterns

Streaming Upsert

For very large datasets, implement streaming upsert:

async def stream_upsert(vector_ops, data_generator, batch_size=1000):
    """Stream large datasets into Antarys"""
    batch = []
    total_processed = 0
    
    async for vector_data in data_generator():
        batch.append(vector_data)
        
        if len(batch) >= batch_size:
            result = await vector_ops.upsert(
                batch,
                validate_dimensions=True,
                show_progress=True
            )
            total_processed += result["upserted_count"]
            batch = []  # Reset batch
            
            # Optional: yield control to prevent blocking
            await asyncio.sleep(0.01)
    
    # Process final batch
    if batch:
        result = await vector_ops.upsert(batch, validate_dimensions=True)
        total_processed += result["upserted_count"]
    
    return total_processed

# Usage
async def data_generator():
    # Your data source here
    for i in range(100000):
        yield {
            "id": f"stream_vector_{i}",
            "values": np.random.random(512).tolist(),
            "metadata": {"batch": i // 1000}
        }

total = await stream_upsert(vector_ops, data_generator)
print(f"Streamed {total} vectors")

Similarity Search with Re-ranking

async def hybrid_search(vector_ops, query_vector, text_query, top_k=10):
    """Combine vector similarity with text matching"""
    
    # First stage: Vector similarity search
    initial_results = await vector_ops.query(
        vector=query_vector,
        top_k=top_k * 3,  # Get more candidates
        include_metadata=True,
        use_ann=True
    )
    
    # Second stage: Re-rank based on text similarity
    candidates = []
    for match in initial_results["matches"]:
        text_score = calculate_text_similarity(
            text_query, 
            match["metadata"].get("text", "")
        )
        
        # Combine scores
        combined_score = 0.7 * match["score"] + 0.3 * text_score
        candidates.append({
            **match,
            "combined_score": combined_score
        })
    
    # Sort by combined score and return top_k
    candidates.sort(key=lambda x: x["combined_score"], reverse=True)
    return {"matches": candidates[:top_k]}

def calculate_text_similarity(query, text):
    """Simple text similarity calculation"""
    # Implement your text similarity logic here
    # This is a placeholder
    return 0.5

Error Handling and Retry Logic

import asyncio
from typing import List, Dict, Any

async def robust_upsert(vector_ops, vectors: List[Dict[str, Any]], 
                       max_retries=3, backoff_factor=2):
    """Upsert with robust error handling"""
    
    for attempt in range(max_retries):
        try:
            result = await vector_ops.upsert(
                vectors,
                validate_dimensions=True,
                show_progress=True
            )
            return result
            
        except ValueError as e:
            # Dimension validation errors - don't retry
            print(f"Validation error: {e}")
            raise
            
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = backoff_factor ** attempt
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
            else:
                print(f"All {max_retries} attempts failed")
                raise

# Usage
try:
    result = await robust_upsert(vector_ops, my_vectors)
    print(f"Successfully upserted {result['upserted_count']} vectors")
except Exception as e:
    print(f"Upsert failed permanently: {e}")

Performance Best Practices

🚀 Batch Operations

Optimize Throughput

  • Use batch sizes of 1000-5000 for optimal performance
  • Enable parallel workers (4-16 depending on CPU)
  • Process large datasets in chunks to manage memory

💾 Memory Management

Efficient Resource Usage

  • Convert NumPy arrays to lists for JSON serialization
  • Use streaming upsert for very large datasets
  • Monitor memory usage during batch operations

🔍 Query Optimization

Fast Search Performance

  • Enable HNSW indexing with use_ann=True
  • Adjust ef_search parameter for accuracy vs speed
  • Use metadata filtering to reduce search space
  • Leverage client-side caching for repeated queries

🛡️ Error Handling

Robust Operations

  • Always enable dimension validation for data quality
  • Implement retry logic with exponential backoff
  • Handle network errors gracefully
  • Monitor cache hit rates and performance metrics

Resource Cleanup

Always properly clean up resources:

# Clean up vector operations
await vector_ops.clear_cache()
await vector_ops.close()

# Clean up client
await client.close()

Best Practice: Use context managers or try/finally blocks to ensure proper cleanup even if errors occur.