Antarys

|

Antarys

Cook book

Get Started with OpenAI with Your Custom Data

Build a Retrieval-Augmented Generation (RAG) system using OpenAI embeddings and Antarys vector database.

Get Started with OpenAI with Your Custom Data

Build a simple yet effective RAG system that combines OpenAI's embeddings and chat models with Antarys vector database for intelligent document querying.

Overview

This cookbook walks through a complete RAG implementation that demonstrates the core pattern: embed documents, store in vector database, retrieve relevant context, and generate informed responses.

Complete Implementation

Class Initialization

import asyncio
from openai import OpenAI
from antarys import Client

class SimpleRAG:
    def __init__(self):
        self.openai = OpenAI()
        self.antarys = None
        self.vectors = None

Basic setup with OpenAI client initialization. The antarys and vectors attributes are set to None and initialized later in the async init() method.

Database Connection Setup

async def init(self):
    self.antarys = Client(host="http://localhost:8080")
    await self.antarys.create_collection("docs", dimensions=1536)
    self.vectors = self.antarys.vector_operations("docs")

Establishes connection to Antarys server and creates a collection with 1536 dimensions (matching OpenAI's text-embedding-3-small output). The vector_operations interface provides CRUD operations on the collection.

Text Embedding

def embed(self, text):
    return self.openai.embeddings.create(
        model="text-embedding-3-small",
        input=text
    ).data[0].embedding

Converts text to vector embeddings using OpenAI's embedding model. Returns the embedding array directly from the API response.

Document Storage

async def add(self, doc_id, content):
    await self.vectors.upsert([{
        "id": doc_id,
        "values": self.embed(content),
        "metadata": {"content": content}
    }])

Stores documents by embedding the content and upserting to the vector database. The original content is preserved in metadata for later retrieval and context generation.

Document Retrieval

async def search(self, query, top_k=3):
    results = await self.vectors.query(
        vector=self.embed(query),
        top_k=top_k,
        include_metadata=True
    )
    return results["matches"]

Performs semantic search by embedding the query and finding the most similar vectors. Returns the matches array containing IDs, similarity scores, and metadata.

AI Response Generation

def generate(self, query, docs):
    context = "\n".join([doc["metadata"]["content"] for doc in docs])
    return self.openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}]
    ).choices[0].message.content

Combines retrieved document contents into a single context string and prompts GPT-4 to answer the query based on that context. Returns the generated response directly.

Complete Query Pipeline

async def query(self, question, verbose=False):
    docs = await self.search(question)
    answer = self.generate(question, docs)

    if verbose:
        print(f"Q: {question}")
        print(f"A: {answer}")
        for doc in docs:
            print(f"Source: {doc['id']} ({doc['score']:.3f})")

    return answer, docs

Orchestrates the full RAG pipeline: search for relevant documents, generate response, and optionally display debug information. Returns both the answer and source documents.

Usage Example

async def main():
    rag = SimpleRAG()
    await rag.init()

    await rag.add("AHNSW",
        "Unlike traditional sequential HNSW, we are using a different asynchronous approach to HNSW and eliminating thread locks with the help of architectural fine tuning. We will soon release more technical details on the Async HNSW algorithmic approach.")
    await rag.add("Antarys",
        "Antarys is a multi-modal vector database and it uses the AHNSW algorithm to enhance its performance to perform semantic searching based on similarity")
    await rag.query("what is Antarys?", verbose=True)

asyncio.run(main())

Demonstrates the complete workflow: initialize the system, add a document, and query it. The verbose=True flag shows the question, answer, and source attribution.

Key Design Patterns

Async/Await Pattern: All database operations use async/await for non-blocking I/O, essential for production systems handling multiple concurrent requests.

Embedding Strategy

  • Uses OpenAI's text-embedding-3-small for cost-effective embeddings
  • Stores original content in metadata for context generation
  • Single embedding call per document/query (no chunking in this simple implementation)

Vector Operations

  • upsert() allows document updates by reusing the same ID
  • query() with include_metadata=True retrieves both vectors and original content
  • Similarity scores help evaluate retrieval quality

Context Generation

  • Simple concatenation of document contents
  • Basic prompt format: "Context: ... Question: ..."
  • Returns raw GPT-4 response without additional processing

Full code

import asyncio
from openai import OpenAI
from antarys import Client


class SimpleRAG:
    def __init__(self):
        self.openai = OpenAI()
        self.antarys = None
        self.vectors = None

    async def init(self):
        self.antarys = Client(host="http://localhost:8080")
        await self.antarys.create_collection("docs", dimensions=1536)
        self.vectors = self.antarys.vector_operations("docs")

    def embed(self, text):
        return self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        ).data[0].embedding

    async def add(self, doc_id, content):
        await self.vectors.upsert([{
            "id": doc_id,
            "values": self.embed(content),
            "metadata": {"content": content}
        }])

    async def search(self, query, top_k=3):
        results = await self.vectors.query(
            vector=self.embed(query),
            top_k=top_k,
            include_metadata=True
        )
        return results["matches"]

    def generate(self, query, docs):
        context = "\n".join([doc["metadata"]["content"] for doc in docs])
        return self.openai.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}]
        ).choices[0].message.content

    async def query(self, question, verbose=False):
        docs = await self.search(question)
        answer = self.generate(question, docs)

        if verbose:
            print(f"Q: {question}")
            print(f"A: {answer}")
            for doc in docs:
                print(f"Source: {doc['id']} ({doc['score']:.3f})")

        return answer, docs


async def main():
    rag = SimpleRAG()
    await rag.init()
    
    await rag.add("AHNSW",
        "Unlike traditional sequential HNSW, we are using a different asynchronous approach to HNSW and eliminating thread locks with the help of architectural fine tuning. We will soon release more technical details on the Async HNSW algorithmic approach.")
    await rag.add("Antarys",
        "Antarys is a multi-modal vector database and it uses the AHNSW algorithm to enhance its performance to perform semantic searching based on similarity")
    await rag.query("what is Antarys?", verbose=True)


asyncio.run(main())