Examples & Samples
Practical examples and code samples for using the Antarys Python client with real-world datasets and use cases.
Examples & Samples
Learn how to use Antarys with real-world examples including DBpedia semantic search and image similarity detection.
🔍 DBpedia Semantic Search
Search through knowledge graph entities with OpenAI embeddings
🖼️ Image Similarity Search
Find similar images using ResNet feature extraction
DBpedia Semantic Search
This example demonstrates how to work with the DBpedia dataset using pre-computed OpenAI embeddings for semantic search.
Prerequisites
Install required dependencies:
pip install antarys datasets numpy tqdm
Dataset: We'll use the KShivendu/dbpedia-entities-openai-1M dataset which contains 1M DBpedia entities with OpenAI embeddings.
Complete Example
Setup and Data Loading
import asyncio
import uuid
from typing import List, Dict, Any
from datasets import load_dataset
from tqdm.asyncio import tqdm
import antarys
async def load_dbpedia_sample(limit: int = 10000) -> List[Dict[str, Any]]:
"""Load DBpedia dataset with OpenAI embeddings"""
print(f"Loading {limit} samples from DBpedia dataset...")
dataset = load_dataset("KShivendu/dbpedia-entities-openai-1M", split='train')
samples = []
for i, item in tqdm(enumerate(dataset), total=limit, desc="Processing"):
if i >= limit:
break
samples.append({
"id": str(uuid.uuid4()),
"values": item["openai"], # Pre-computed OpenAI embeddings
"metadata": {
"title": item["title"],
"text": item["text"],
"source": "dbpedia",
"sample_id": i
}
})
return samples
Initialize Antarys Client
async def setup_collection(client, collection_name: str):
"""Create and configure collection for DBpedia data"""
# Clean up existing collection if it exists
collections = await client.list_collections()
if collection_name in collections:
await client.delete_collection(collection_name)
# Create new collection optimized for OpenAI embeddings
await client.create_collection(
name=collection_name,
dimensions=1536, # OpenAI embedding dimensions
enable_hnsw=True,
shards=16,
m=16,
ef_construction=100
)
return client.vector_operations(collection_name)
Insert Data
async def insert_dbpedia_data(vector_ops, samples: List[Dict[str, Any]]):
"""Batch insert DBpedia vectors"""
print(f"Inserting {len(samples)} vectors...")
result = await vector_ops.upsert(
samples,
batch_size=1000,
show_progress=True,
validate_dimensions=True
)
print(f"Successfully inserted {result.get('upserted_count', len(samples))} vectors")
return result
Semantic Search
async def search_dbpedia(vector_ops, query_vector: List[float], query_text: str = ""):
"""Perform semantic search on DBpedia data"""
print(f"Searching for: {query_text}")
results = await vector_ops.query(
vector=query_vector,
top_k=5,
include_metadata=True,
include_values=False,
use_ann=True,
ef_search=200
)
print("\n🔍 Search Results:")
print("=" * 50)
for i, match in enumerate(results["matches"], 1):
title = match["metadata"]["title"]
text = match["metadata"]["text"][:200] + "..."
score = match["score"]
print(f"{i}. {title} (Score: {score:.4f})")
print(f" {text}")
print()
return results
Main Function
async def main():
# Initialize client
client = await antarys.create_client(
host="http://localhost:8080",
use_http2=True,
cache_size=1000,
timeout=120
)
collection_name = "dbpedia_semantic_search"
try:
# Setup collection
vector_ops = await setup_collection(client, collection_name)
# Load and insert data
samples = await load_dbpedia_sample(limit=10000)
await insert_dbpedia_data(vector_ops, samples)
# Example searches using existing embeddings
sample_queries = [
(samples[0]["values"], samples[0]["metadata"]["title"]),
(samples[100]["values"], samples[100]["metadata"]["title"]),
(samples[500]["values"], samples[500]["metadata"]["title"])
]
for query_vector, query_title in sample_queries:
await search_dbpedia(vector_ops, query_vector, query_title)
print("\n" + "="*70 + "\n")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Advanced Filtering
# Search with metadata filtering
results = await vector_ops.query(
vector=query_vector,
top_k=10,
include_metadata=True,
filter={"metadata.source": "dbpedia"}, # Filter by source
threshold=0.7 # Minimum similarity threshold
)
# Search with multiple filters
results = await vector_ops.query(
vector=query_vector,
top_k=5,
include_metadata=True,
filter={
"metadata.source": "dbpedia",
"metadata.sample_id": {"$gte": 1000} # Sample ID >= 1000
}
)
Image Similarity Search
This example shows how to build an image similarity search system using ResNet features and Antarys.
Prerequisites
pip install antarys torch torchvision timm pillow matplotlib scikit-learn
Complete Image Search Example
Feature Extraction Setup
import torch
from PIL import Image
import timm
from sklearn.preprocessing import normalize
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform
import asyncio
import os
import uuid
from pathlib import Path
import matplotlib.pyplot as plt
import antarys
class ImageFeatureExtractor:
"""Extract features from images using pre-trained ResNet"""
def __init__(self, model_name="resnet34"):
self.model = timm.create_model(
model_name,
pretrained=True,
num_classes=0, # Remove classification head
global_pool="avg"
)
self.model.eval()
config = resolve_data_config({}, model=model_name)
self.preprocess = create_transform(**config)
def extract_features(self, image_path: str):
"""Extract normalized feature vector from image"""
# Load and preprocess image
image = Image.open(image_path).convert("RGB")
input_tensor = self.preprocess(image).unsqueeze(0)
# Extract features
with torch.no_grad():
features = self.model(input_tensor)
# Normalize features
feature_vector = features.squeeze().numpy()
return normalize(feature_vector.reshape(1, -1), norm="l2").flatten()
Image Dataset Processing
async def process_image_dataset(vector_ops, dataset_path: str, extractor: ImageFeatureExtractor):
"""Process images and insert into Antarys"""
image_records = []
supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
print(f"Processing images from {dataset_path}...")
for root, dirs, files in os.walk(dataset_path):
for file in files:
if Path(file).suffix.lower() in supported_formats:
file_path = os.path.join(root, file)
try:
# Extract features
features = extractor.extract_features(file_path)
# Create record
record = {
"id": str(uuid.uuid4()),
"values": features.tolist(),
"metadata": {
"filename": file_path,
"category": os.path.basename(root),
"file_size": os.path.getsize(file_path)
}
}
image_records.append(record)
except Exception as e:
print(f"Error processing {file_path}: {e}")
continue
# Batch insert
if image_records:
print(f"Inserting {len(image_records)} image vectors...")
await vector_ops.upsert(
image_records,
batch_size=100,
show_progress=True
)
return len(image_records)
Image Search Function
async def search_similar_images(vector_ops, query_image_path: str, extractor: ImageFeatureExtractor, top_k: int = 5):
"""Find similar images"""
# Extract features from query image
query_features = extractor.extract_features(query_image_path)
# Search for similar images
results = await vector_ops.query(
vector=query_features.tolist(),
top_k=top_k,
include_metadata=True,
include_values=False,
use_ann=True
)
return results
def display_image_results(query_image_path: str, results, save_path: str = None):
"""Display query image and similar results"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
# Display query image
query_img = Image.open(query_image_path).resize((200, 200))
axes[0].imshow(query_img)
axes[0].set_title("Query Image")
axes[0].axis('off')
# Display similar images
for i, match in enumerate(results["matches"][:5], 1):
img_path = match["metadata"]["filename"]
score = match["score"]
try:
img = Image.open(img_path).resize((200, 200))
axes[i].imshow(img)
axes[i].set_title(f"Match {i}\nScore: {score:.3f}")
axes[i].axis('off')
except Exception as e:
axes[i].text(0.5, 0.5, f"Error loading\n{img_path}",
ha='center', va='center', transform=axes[i].transAxes)
axes[i].set_title(f"Match {i} (Error)")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.show()
Complete Image Search Pipeline
async def image_search_demo():
"""Complete image similarity search demonstration"""
# Initialize Antarys client
client = await antarys.create_client(
host="http://localhost:8080",
use_http2=True,
cache_size=500
)
collection_name = "image_similarity"
try:
# Create collection for image features
await client.create_collection(
name=collection_name,
dimensions=512, # ResNet34 feature dimensions
enable_hnsw=True,
m=16,
ef_construction=200
)
vector_ops = client.vector_operations(collection_name)
extractor = ImageFeatureExtractor("resnet34")
# Process dataset (replace with your image directory)
dataset_path = "./images" # Your image directory
if os.path.exists(dataset_path):
num_processed = await process_image_dataset(vector_ops, dataset_path, extractor)
print(f"Processed {num_processed} images")
else:
print(f"Dataset path {dataset_path} not found")
return
# Example query
query_image = "./query.jpg" # Your query image
if os.path.exists(query_image):
results = await search_similar_images(vector_ops, query_image, extractor)
print(f"\nFound {len(results['matches'])} similar images:")
for i, match in enumerate(results["matches"], 1):
filename = os.path.basename(match["metadata"]["filename"])
score = match["score"]
print(f"{i}. {filename} (similarity: {score:.3f})")
# Display results
display_image_results(query_image, results, "similarity_results.png")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(image_search_demo())
Quick Samples
Minimal Working Example
import asyncio
import antarys
async def quick_start():
client = await antarys.create_client("http://localhost:8080")
await client.create_collection("quickstart", dimensions=3)
vectors = client.vector_operations("quickstart")
# Insert vectors
await vectors.upsert([
{"id": "a", "values": [1, 2, 3], "metadata": {"type": "sample"}},
{"id": "b", "values": [4, 5, 6], "metadata": {"type": "sample"}}
])
# Search
results = await vectors.query(vector=[1, 2, 3], top_k=5)
print(f"Found {len(results['matches'])} matches")
await client.close()
asyncio.run(quick_start())
Real-time Recommendations
async def recommendation_system():
client = await antarys.create_client("http://localhost:8080")
await client.create_collection("recommendations", dimensions=128)
vectors = client.vector_operations("recommendations")
# User embeddings and item embeddings
user_profile = [0.1] * 128 # User preference vector
# Find similar items
results = await vectors.query(
vector=user_profile,
top_k=10,
filter={"metadata.category": "electronics"},
threshold=0.7
)
recommended_items = [match["metadata"] for match in results["matches"]]
await client.close()
return recommended_items
Production Tips: Remember to properly handle exceptions, implement connection pooling, and monitor performance in production environments.