Antarys

|

Antarys

Python Client

Examples & Samples

Practical examples and code samples for using the Antarys Python client with real-world datasets and use cases.

Examples & Samples

Learn how to use Antarys with real-world examples including DBpedia semantic search and image similarity detection.

This example demonstrates how to work with the DBpedia dataset using pre-computed OpenAI embeddings for semantic search.

Prerequisites

Install required dependencies:

pip install antarys datasets numpy tqdm

Dataset: We'll use the KShivendu/dbpedia-entities-openai-1M dataset which contains 1M DBpedia entities with OpenAI embeddings.

Complete Example

Setup and Data Loading

import asyncio
import uuid
from typing import List, Dict, Any
from datasets import load_dataset
from tqdm.asyncio import tqdm
import antarys

async def load_dbpedia_sample(limit: int = 10000) -> List[Dict[str, Any]]:
    """Load DBpedia dataset with OpenAI embeddings"""
    print(f"Loading {limit} samples from DBpedia dataset...")
    dataset = load_dataset("KShivendu/dbpedia-entities-openai-1M", split='train')
    
    samples = []
    for i, item in tqdm(enumerate(dataset), total=limit, desc="Processing"):
        if i >= limit:
            break
            
        samples.append({
            "id": str(uuid.uuid4()),
            "values": item["openai"],  # Pre-computed OpenAI embeddings
            "metadata": {
                "title": item["title"],
                "text": item["text"],
                "source": "dbpedia",
                "sample_id": i
            }
        })
    
    return samples

Initialize Antarys Client

async def setup_collection(client, collection_name: str):
    """Create and configure collection for DBpedia data"""
    
    # Clean up existing collection if it exists
    collections = await client.list_collections()
    if collection_name in collections:
        await client.delete_collection(collection_name)
    
    # Create new collection optimized for OpenAI embeddings
    await client.create_collection(
        name=collection_name,
        dimensions=1536,  # OpenAI embedding dimensions
        enable_hnsw=True,
        shards=16,
        m=16,
        ef_construction=100
    )
    
    return client.vector_operations(collection_name)

Insert Data

async def insert_dbpedia_data(vector_ops, samples: List[Dict[str, Any]]):
    """Batch insert DBpedia vectors"""
    print(f"Inserting {len(samples)} vectors...")
    
    result = await vector_ops.upsert(
        samples,
        batch_size=1000,
        show_progress=True,
        validate_dimensions=True
    )
    
    print(f"Successfully inserted {result.get('upserted_count', len(samples))} vectors")
    return result
async def search_dbpedia(vector_ops, query_vector: List[float], query_text: str = ""):
    """Perform semantic search on DBpedia data"""
    print(f"Searching for: {query_text}")
    
    results = await vector_ops.query(
        vector=query_vector,
        top_k=5,
        include_metadata=True,
        include_values=False,
        use_ann=True,
        ef_search=200
    )
    
    print("\n🔍 Search Results:")
    print("=" * 50)
    
    for i, match in enumerate(results["matches"], 1):
        title = match["metadata"]["title"]
        text = match["metadata"]["text"][:200] + "..."
        score = match["score"]
        
        print(f"{i}. {title} (Score: {score:.4f})")
        print(f"   {text}")
        print()
    
    return results

Main Function

async def main():
    # Initialize client
    client = await antarys.create_client(
        host="http://localhost:8080",
        use_http2=True,
        cache_size=1000,
        timeout=120
    )
    
    collection_name = "dbpedia_semantic_search"
    
    try:
        # Setup collection
        vector_ops = await setup_collection(client, collection_name)
        
        # Load and insert data
        samples = await load_dbpedia_sample(limit=10000)
        await insert_dbpedia_data(vector_ops, samples)
        
        # Example searches using existing embeddings
        sample_queries = [
            (samples[0]["values"], samples[0]["metadata"]["title"]),
            (samples[100]["values"], samples[100]["metadata"]["title"]),
            (samples[500]["values"], samples[500]["metadata"]["title"])
        ]
        
        for query_vector, query_title in sample_queries:
            await search_dbpedia(vector_ops, query_vector, query_title)
            print("\n" + "="*70 + "\n")
    
    finally:
        await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Filtering

# Search with metadata filtering
results = await vector_ops.query(
    vector=query_vector,
    top_k=10,
    include_metadata=True,
    filter={"metadata.source": "dbpedia"},  # Filter by source
    threshold=0.7  # Minimum similarity threshold
)

# Search with multiple filters
results = await vector_ops.query(
    vector=query_vector,
    top_k=5,
    include_metadata=True,
    filter={
        "metadata.source": "dbpedia",
        "metadata.sample_id": {"$gte": 1000}  # Sample ID >= 1000
    }
)

This example shows how to build an image similarity search system using ResNet features and Antarys.

Prerequisites

pip install antarys torch torchvision timm pillow matplotlib scikit-learn

Complete Image Search Example

Feature Extraction Setup

import torch
from PIL import Image
import timm
from sklearn.preprocessing import normalize
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform
import asyncio
import os
import uuid
from pathlib import Path
import matplotlib.pyplot as plt
import antarys

class ImageFeatureExtractor:
    """Extract features from images using pre-trained ResNet"""
    
    def __init__(self, model_name="resnet34"):
        self.model = timm.create_model(
            model_name, 
            pretrained=True, 
            num_classes=0,  # Remove classification head
            global_pool="avg"
        )
        self.model.eval()
        
        config = resolve_data_config({}, model=model_name)
        self.preprocess = create_transform(**config)
    
    def extract_features(self, image_path: str):
        """Extract normalized feature vector from image"""
        # Load and preprocess image
        image = Image.open(image_path).convert("RGB")
        input_tensor = self.preprocess(image).unsqueeze(0)
        
        # Extract features
        with torch.no_grad():
            features = self.model(input_tensor)
        
        # Normalize features
        feature_vector = features.squeeze().numpy()
        return normalize(feature_vector.reshape(1, -1), norm="l2").flatten()

Image Dataset Processing

async def process_image_dataset(vector_ops, dataset_path: str, extractor: ImageFeatureExtractor):
    """Process images and insert into Antarys"""
    
    image_records = []
    supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
    
    print(f"Processing images from {dataset_path}...")
    
    for root, dirs, files in os.walk(dataset_path):
        for file in files:
            if Path(file).suffix.lower() in supported_formats:
                file_path = os.path.join(root, file)
                
                try:
                    # Extract features
                    features = extractor.extract_features(file_path)
                    
                    # Create record
                    record = {
                        "id": str(uuid.uuid4()),
                        "values": features.tolist(),
                        "metadata": {
                            "filename": file_path,
                            "category": os.path.basename(root),
                            "file_size": os.path.getsize(file_path)
                        }
                    }
                    image_records.append(record)
                    
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")
                    continue
    
    # Batch insert
    if image_records:
        print(f"Inserting {len(image_records)} image vectors...")
        await vector_ops.upsert(
            image_records,
            batch_size=100,
            show_progress=True
        )
    
    return len(image_records)

Image Search Function

async def search_similar_images(vector_ops, query_image_path: str, extractor: ImageFeatureExtractor, top_k: int = 5):
    """Find similar images"""
    
    # Extract features from query image
    query_features = extractor.extract_features(query_image_path)
    
    # Search for similar images
    results = await vector_ops.query(
        vector=query_features.tolist(),
        top_k=top_k,
        include_metadata=True,
        include_values=False,
        use_ann=True
    )
    
    return results

def display_image_results(query_image_path: str, results, save_path: str = None):
    """Display query image and similar results"""
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    # Display query image
    query_img = Image.open(query_image_path).resize((200, 200))
    axes[0].imshow(query_img)
    axes[0].set_title("Query Image")
    axes[0].axis('off')
    
    # Display similar images
    for i, match in enumerate(results["matches"][:5], 1):
        img_path = match["metadata"]["filename"]
        score = match["score"]
        
        try:
            img = Image.open(img_path).resize((200, 200))
            axes[i].imshow(img)
            axes[i].set_title(f"Match {i}\nScore: {score:.3f}")
            axes[i].axis('off')
        except Exception as e:
            axes[i].text(0.5, 0.5, f"Error loading\n{img_path}", 
                        ha='center', va='center', transform=axes[i].transAxes)
            axes[i].set_title(f"Match {i} (Error)")
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
    
    plt.show()

Complete Image Search Pipeline

async def image_search_demo():
    """Complete image similarity search demonstration"""
    
    # Initialize Antarys client
    client = await antarys.create_client(
        host="http://localhost:8080",
        use_http2=True,
        cache_size=500
    )
    
    collection_name = "image_similarity"
    
    try:
        # Create collection for image features
        await client.create_collection(
            name=collection_name,
            dimensions=512,  # ResNet34 feature dimensions
            enable_hnsw=True,
            m=16,
            ef_construction=200
        )
        
        vector_ops = client.vector_operations(collection_name)
        extractor = ImageFeatureExtractor("resnet34")
        
        # Process dataset (replace with your image directory)
        dataset_path = "./images"  # Your image directory
        if os.path.exists(dataset_path):
            num_processed = await process_image_dataset(vector_ops, dataset_path, extractor)
            print(f"Processed {num_processed} images")
        else:
            print(f"Dataset path {dataset_path} not found")
            return
        
        # Example query
        query_image = "./query.jpg"  # Your query image
        if os.path.exists(query_image):
            results = await search_similar_images(vector_ops, query_image, extractor)
            
            print(f"\nFound {len(results['matches'])} similar images:")
            for i, match in enumerate(results["matches"], 1):
                filename = os.path.basename(match["metadata"]["filename"])
                score = match["score"]
                print(f"{i}. {filename} (similarity: {score:.3f})")
            
            # Display results
            display_image_results(query_image, results, "similarity_results.png")
        
    finally:
        await client.close()

if __name__ == "__main__":
    asyncio.run(image_search_demo())

Quick Samples

Minimal Working Example

import asyncio
import antarys

async def quick_start():
    client = await antarys.create_client("http://localhost:8080")
    await client.create_collection("quickstart", dimensions=3)
    vectors = client.vector_operations("quickstart")
    
    # Insert vectors
    await vectors.upsert([
        {"id": "a", "values": [1, 2, 3], "metadata": {"type": "sample"}},
        {"id": "b", "values": [4, 5, 6], "metadata": {"type": "sample"}}
    ])
    
    # Search
    results = await vectors.query(vector=[1, 2, 3], top_k=5)
    print(f"Found {len(results['matches'])} matches")
    
    await client.close()

asyncio.run(quick_start())

Real-time Recommendations

async def recommendation_system():
    client = await antarys.create_client("http://localhost:8080")
    await client.create_collection("recommendations", dimensions=128)
    vectors = client.vector_operations("recommendations")
    
    # User embeddings and item embeddings
    user_profile = [0.1] * 128  # User preference vector
    
    # Find similar items
    results = await vectors.query(
        vector=user_profile,
        top_k=10,
        filter={"metadata.category": "electronics"},
        threshold=0.7
    )
    
    recommended_items = [match["metadata"] for match in results["matches"]]
    await client.close()
    
    return recommended_items

Production Tips: Remember to properly handle exceptions, implement connection pooling, and monitor performance in production environments.