Get Started with OpenAI with Your Custom Data
Build a Retrieval-Augmented Generation (RAG) system using OpenAI embeddings and Antarys vector database.
Get Started with OpenAI with Your Custom Data
Build a simple yet effective RAG system that combines OpenAI's embeddings and chat models with Antarys vector database for intelligent document querying.
Overview
This cookbook walks through a complete RAG implementation that demonstrates the core pattern: embed documents, store in vector database, retrieve relevant context, and generate informed responses.
🏗️ Class Setup
Initialize OpenAI and Antarys clients
📊 Vector Storage
Embed and store documents in vector database
🔍 Semantic Search
Find relevant documents using vector similarity
🤖 Response Generation
Generate contextual responses with retrieved documents
Complete Implementation
Class Initialization
import asyncio
from openai import OpenAI
from antarys import Client
class SimpleRAG:
def __init__(self):
self.openai = OpenAI()
self.antarys = None
self.vectors = None
Basic setup with OpenAI client initialization. The antarys
and vectors
attributes are set to None
and initialized later in the async init()
method.
Database Connection Setup
async def init(self):
self.antarys = Client(host="http://localhost:8080")
await self.antarys.create_collection("docs", dimensions=1536)
self.vectors = self.antarys.vector_operations("docs")
Establishes connection to Antarys server and creates a collection with 1536 dimensions (matching OpenAI's text-embedding-3-small
output). The vector_operations
interface provides CRUD operations on the collection.
Text Embedding
def embed(self, text):
return self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
).data[0].embedding
Converts text to vector embeddings using OpenAI's embedding model. Returns the embedding array directly from the API response.
Document Storage
async def add(self, doc_id, content):
await self.vectors.upsert([{
"id": doc_id,
"values": self.embed(content),
"metadata": {"content": content}
}])
Stores documents by embedding the content and upserting to the vector database. The original content is preserved in metadata for later retrieval and context generation.
Document Retrieval
async def search(self, query, top_k=3):
results = await self.vectors.query(
vector=self.embed(query),
top_k=top_k,
include_metadata=True
)
return results["matches"]
Performs semantic search by embedding the query and finding the most similar vectors. Returns the matches array containing IDs, similarity scores, and metadata.
AI Response Generation
def generate(self, query, docs):
context = "\n".join([doc["metadata"]["content"] for doc in docs])
return self.openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}]
).choices[0].message.content
Combines retrieved document contents into a single context string and prompts GPT-4 to answer the query based on that context. Returns the generated response directly.
Complete Query Pipeline
async def query(self, question, verbose=False):
docs = await self.search(question)
answer = self.generate(question, docs)
if verbose:
print(f"Q: {question}")
print(f"A: {answer}")
for doc in docs:
print(f"Source: {doc['id']} ({doc['score']:.3f})")
return answer, docs
Orchestrates the full RAG pipeline: search for relevant documents, generate response, and optionally display debug information. Returns both the answer and source documents.
Usage Example
async def main():
rag = SimpleRAG()
await rag.init()
await rag.add("AHNSW",
"Unlike traditional sequential HNSW, we are using a different asynchronous approach to HNSW and eliminating thread locks with the help of architectural fine tuning. We will soon release more technical details on the Async HNSW algorithmic approach.")
await rag.add("Antarys",
"Antarys is a multi-modal vector database and it uses the AHNSW algorithm to enhance its performance to perform semantic searching based on similarity")
await rag.query("what is Antarys?", verbose=True)
asyncio.run(main())
Demonstrates the complete workflow: initialize the system, add a document, and query it. The verbose=True
flag shows the question, answer, and source attribution.
Key Design Patterns
Async/Await Pattern: All database operations use async/await for non-blocking I/O, essential for production systems handling multiple concurrent requests.
Embedding Strategy
- Uses OpenAI's
text-embedding-3-small
for cost-effective embeddings - Stores original content in metadata for context generation
- Single embedding call per document/query (no chunking in this simple implementation)
Vector Operations
upsert()
allows document updates by reusing the same IDquery()
withinclude_metadata=True
retrieves both vectors and original content- Similarity scores help evaluate retrieval quality
Context Generation
- Simple concatenation of document contents
- Basic prompt format: "Context: ... Question: ..."
- Returns raw GPT-4 response without additional processing
Full code
import asyncio
from openai import OpenAI
from antarys import Client
class SimpleRAG:
def __init__(self):
self.openai = OpenAI()
self.antarys = None
self.vectors = None
async def init(self):
self.antarys = Client(host="http://localhost:8080")
await self.antarys.create_collection("docs", dimensions=1536)
self.vectors = self.antarys.vector_operations("docs")
def embed(self, text):
return self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
).data[0].embedding
async def add(self, doc_id, content):
await self.vectors.upsert([{
"id": doc_id,
"values": self.embed(content),
"metadata": {"content": content}
}])
async def search(self, query, top_k=3):
results = await self.vectors.query(
vector=self.embed(query),
top_k=top_k,
include_metadata=True
)
return results["matches"]
def generate(self, query, docs):
context = "\n".join([doc["metadata"]["content"] for doc in docs])
return self.openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}]
).choices[0].message.content
async def query(self, question, verbose=False):
docs = await self.search(question)
answer = self.generate(question, docs)
if verbose:
print(f"Q: {question}")
print(f"A: {answer}")
for doc in docs:
print(f"Source: {doc['id']} ({doc['score']:.3f})")
return answer, docs
async def main():
rag = SimpleRAG()
await rag.init()
await rag.add("AHNSW",
"Unlike traditional sequential HNSW, we are using a different asynchronous approach to HNSW and eliminating thread locks with the help of architectural fine tuning. We will soon release more technical details on the Async HNSW algorithmic approach.")
await rag.add("Antarys",
"Antarys is a multi-modal vector database and it uses the AHNSW algorithm to enhance its performance to perform semantic searching based on similarity")
await rag.query("what is Antarys?", verbose=True)
asyncio.run(main())