Examples¶
Simple demo¶
from typing import Optional
from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.spec import PrimaryKeyAutoIncrease, Table, Vector
DenseVector = Vector[768]
class Document(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
title: str = ""
text: str
vec: DenseVector
if __name__ == "__main__":
vr = VechordRegistry("simple", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document])
emb = GeminiDenseEmbedding()
# add a document
text = "my personal long note"
doc = Document(title="note", text=text, vec=DenseVector(emb.vectorize_chunk(text)))
vr.insert(doc)
# load
docs = vr.select_by(Document.partial_init(), limit=1)
print(docs)
# query
res = vr.search_by_vector(Document, emb.vectorize_query("note"), topk=1)
print(res)
BEIR evaluation¶
import csv
import zipfile
from pathlib import Path
from typing import Iterator
import httpx
import msgspec
import rich.progress
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import BaseEvaluator
from vechord.registry import VechordRegistry
from vechord.spec import Table, Vector
BASE_URL = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip"
DEFAULT_DATASET = "scifact"
TOP_K = 10
emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
def download_dataset(dataset: str, output: Path):
output.mkdir(parents=True, exist_ok=True)
zip = output / f"{dataset}.zip"
if not zip.is_file():
with (
zip.open("wb") as f,
httpx.stream("GET", BASE_URL.format(dataset)) as stream,
):
total = int(stream.headers["Content-Length"])
with rich.progress.Progress(
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.BarColumn(bar_width=None),
rich.progress.DownloadColumn(),
rich.progress.TransferSpeedColumn(),
) as progress:
download_task = progress.add_task("Download", total=total)
for chunk in stream.iter_bytes():
f.write(chunk)
progress.update(
download_task, completed=stream.num_bytes_downloaded
)
unzip_dir = output / dataset
if not unzip_dir.is_dir():
with zipfile.ZipFile(zip, "r") as f:
f.extractall(output)
return unzip_dir
class Corpus(Table):
uid: str
text: str
title: str
vector: DenseVector
class Query(Table):
uid: str
cid: str
text: str
vector: DenseVector
class Evaluation(msgspec.Struct):
map: float
ndcg: float
recall: float
vr = VechordRegistry(DEFAULT_DATASET, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Corpus, Query])
@vr.inject(output=Corpus)
def load_corpus(dataset: str, output: Path) -> Iterator[Corpus]:
file = output / dataset / "corpus.jsonl"
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
title = item.get("title", "")
text = item.get("text", "")
try:
vector = emb.vectorize_chunk(f"{title}\n{text}")
except Exception as e:
print(f"failed to vectorize {title}: {e}")
continue
yield Corpus(
uid=item["_id"],
text=text,
title=title,
vector=DenseVector(vector),
)
@vr.inject(output=Query)
def load_query(dataset: str, output: Path) -> Iterator[Query]:
file = output / dataset / "queries.jsonl"
truth = output / dataset / "qrels" / "test.tsv"
table = {}
with open(truth, "r") as f:
reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
next(reader) # skip header
for row in reader:
table[row[0]] = row[1]
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
uid = item["_id"]
if uid not in table:
continue
text = item.get("text", "")
yield Query(
uid=uid,
cid=table[uid],
text=text,
vector=DenseVector(emb.vectorize_query(text)),
)
@vr.inject(input=Query)
def evaluate(cid: str, vector: DenseVector) -> Evaluation:
docs: list[Corpus] = vr.search_by_vector(Corpus, vector, topk=TOP_K)
score = BaseEvaluator.evaluate_one(cid, [doc.uid for doc in docs])
return Evaluation(
map=score.get("map"),
ndcg=score.get("ndcg"),
recall=score.get(f"recall_{TOP_K}"),
)
if __name__ == "__main__":
save_dir = Path("datasets")
download_dataset(DEFAULT_DATASET, save_dir)
load_corpus(DEFAULT_DATASET, save_dir)
load_query(DEFAULT_DATASET, save_dir)
res: list[Evaluation] = evaluate()
print("ndcg", sum(r.ndcg for r in res) / len(res))
print("recall@10", sum(r.recall for r in res) / len(res))
HTTP web service¶
from datetime import datetime, timezone
from functools import partial
from typing import Annotated
import httpx
import msgspec
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.service import create_web_app
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
extractor = SimpleExtractor()
class Document(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
title: str = ""
text: str
updated_at: datetime = msgspec.field(
default_factory=partial(datetime.now, timezone.utc)
)
class Chunk(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
doc_id: Annotated[int, ForeignKey[Document.uid]]
text: str
vector: DenseVector
vr = VechordRegistry("http", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])
@vr.inject(output=Document)
def load_document(title: str) -> Document:
with httpx.Client() as client:
resp = client.get(URL.format(title))
if resp.is_error:
raise RuntimeError(f"Failed to fetch the document `{title}`")
return Document(title=title, text=extractor.extract_html(resp.text))
@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
chunks = chunker.segment(text)
return [
Chunk(doc_id=uid, text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
for chunk in chunks
]
if __name__ == "__main__":
# this pipeline will be used in the web app, or you can run it with `vr.run()`
pipeline = vr.create_pipeline([load_document, chunk_document])
app = create_web_app(vr, pipeline)
from wsgiref.simple_server import make_server
with make_server("", 8000, app) as server:
server.serve_forever()
Contextual chunk augmentation¶
from datetime import datetime
from typing import Annotated, Optional
from vechord import (
GeminiAugmenter,
GeminiDenseEmbedding,
GeminiEvaluator,
LocalLoader,
RegexChunker,
SimpleExtractor,
)
from vechord.registry import VechordRegistry
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
extractor = SimpleExtractor()
class Document(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
digest: str
filename: str
text: str
updated_at: datetime
class Chunk(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
doc_uid: Annotated[int, ForeignKey[Document.uid]]
seq_id: int
text: str
vector: DenseVector
class ContextChunk(Table, kw_only=True):
chunk_uid: Annotated[int, ForeignKey[Chunk.uid]]
text: str
vector: DenseVector
vr = VechordRegistry("decorator", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk, ContextChunk])
@vr.inject(output=Document)
def load_from_dir(dirpath: str) -> list[Document]:
loader = LocalLoader(dirpath, include=[".pdf"])
return [
Document(
digest=doc.digest,
filename=doc.path,
text=extractor.extract(doc),
updated_at=doc.updated_at,
)
for doc in loader.load()
]
@vr.inject(input=Document, output=Chunk)
def split_document(uid: int, text: str) -> list[Chunk]:
chunker = RegexChunker(overlap=0)
chunks = chunker.segment(text)
return [
Chunk(
doc_uid=uid,
seq_id=i,
text=chunk,
vector=DenseVector(emb.vectorize_chunk(chunk)),
)
for i, chunk in enumerate(chunks)
]
@vr.inject(input=Document, output=ContextChunk)
def context_embedding(uid: int, text: str) -> list[ContextChunk]:
chunks: list[Chunk] = vr.select_by(
Chunk.partial_init(doc_uid=uid), fields=["uid", "text"]
)
augmentor = GeminiAugmenter()
augmentor.reset(text)
context_chunks = [
f"{context}\n{origin}"
for (context, origin) in zip(
augmentor.augment_context([c.text for c in chunks]),
[c.text for c in chunks],
strict=False,
)
]
return [
ContextChunk(
chunk_uid=chunk_uid,
text=augmented,
vector=DenseVector(emb.vectorize_chunk(augmented)),
)
for (chunk_uid, augmented) in zip(
[c.uid for c in chunks], context_chunks, strict=False
)
]
def query_chunk(query: str) -> list[Chunk]:
vector = emb.vectorize_query(query)
res: list[Chunk] = vr.search_by_vector(
Chunk,
vector,
topk=5,
)
return res
def query_context_chunk(query: str) -> list[ContextChunk]:
vector = emb.vectorize_query(query)
res: list[ContextChunk] = vr.search_by_vector(
ContextChunk,
vector,
topk=5,
)
return res
@vr.inject(input=Chunk)
def evaluate(uid: int, doc_uid: int, text: str):
evaluator = GeminiEvaluator()
doc: Document = vr.select_by(Document.partial_init(uid=doc_uid))[0]
query = evaluator.produce_query(doc.text, text)
retrieved = query_chunk(query)
score = evaluator.evaluate_one(str(uid), [str(r.uid) for r in retrieved])
return score
if __name__ == "__main__":
from rich import print
load_from_dir("./data")
split_document()
context_embedding()
chunks = query_chunk("vector search")
print(chunks)
scores = evaluate()
print(sum(scores) / len(scores))
context_chunks = query_context_chunk("vector search")
print(context_chunks)
vr.clear_storage()
Contextual retrieval with the Anthropic example¶
"""Anthropic Cookbook Contextual Embedding Example.
Data can be found from "https://github.com/anthropics/anthropic-cookbook".
"""
import json
from pathlib import Path
from time import perf_counter
from typing import Annotated, Optional
import httpx
from vechord.augment import GeminiAugmenter
from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker, ReciprocalRankFusion
from vechord.spec import (
ForeignKey,
Keyword,
PrimaryKeyAutoIncrease,
Table,
UniqueIndex,
Vector,
)
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
vr = VechordRegistry("anthropic", "postgresql://postgres:postgres@172.17.0.1:5432/")
class Document(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
uuid: Annotated[str, UniqueIndex()]
content: str
class Chunk(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
doc_uuid: Annotated[str, ForeignKey[Document.uuid]]
index: int
content: str
vector: DenseVector
keyword: Keyword
class ContextualChunk(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
doc_uuid: Annotated[str, ForeignKey[Document.uuid]]
index: int
content: str
context: str
vector: DenseVector
keyword: Keyword
class Query(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
content: str
answer: str
doc_uuids: list[str]
chunk_index: list[int]
vector: DenseVector
vr.register([Document, Chunk, ContextualChunk, Query])
def download_data(url: str, save_path: str):
if Path(save_path).is_file():
print(f"{save_path} already exists, skip download.")
return
with httpx.stream("GET", url) as response, open(save_path, "wb") as f:
for chunk in response.iter_bytes():
f.write(chunk)
def load_data(filepath: str):
with open(filepath, "r", encoding="utf-8") as f:
docs = json.load(f)
for doc in docs:
vr.insert(
Document(
uuid=doc["original_uuid"],
content=doc["content"],
)
)
for chunk in doc["chunks"]:
vr.insert(
Chunk(
doc_uuid=doc["original_uuid"],
index=chunk["original_index"],
content=chunk["content"],
vector=emb.vectorize_chunk(chunk["content"]),
keyword=Keyword(chunk["content"]),
)
)
def load_contextual_chunks(filepath: str):
augmenter = GeminiAugmenter()
with open(filepath, "r", encoding="utf-8") as f:
docs = json.load(f)
for doc in docs:
augmenter.reset(doc["content"])
chunks = doc["chunks"]
augments = augmenter.augment_context([chunk["content"] for chunk in chunks])
if len(augments) != len(chunks):
print(
f"augments length not match for uuid: {doc['original_uuid']}, {len(augments)} != {len(chunks)}"
)
for chunk, context in zip(chunks, augments, strict=False):
contextual_content = f"{chunk['content']}\n\n{context}"
vr.insert(
ContextualChunk(
doc_uuid=doc["original_uuid"],
index=chunk["original_index"],
content=chunk["content"],
context=context,
vector=emb.vectorize_chunk(contextual_content),
keyword=Keyword(contextual_content),
)
)
def load_query(filepath: str):
queries = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
query = json.loads(line)
queries.append(
Query(
content=query["query"],
answer=query["answer"],
doc_uuids=[x[0] for x in query["golden_chunk_uuids"]],
chunk_index=[x[1] for x in query["golden_chunk_uuids"]],
vector=emb.vectorize_query(query["query"]),
)
)
vr.copy_bulk(queries)
def vector_search(query: Query, topk: int) -> list[Chunk]:
return vr.search_by_vector(Chunk, query.vector, topk=topk)
def vector_contextual_search(query: Query, topk: int) -> list[ContextualChunk]:
return vr.search_by_vector(ContextualChunk, query.vector, topk=topk)
def keyword_search(query: Query, topk: int) -> list[Chunk]:
return vr.search_by_keyword(Chunk, query.content, topk=topk)
def keyword_contextual_search(query: Query, topk: int) -> list[ContextualChunk]:
return vr.search_by_keyword(ContextualChunk, query.content, topk=topk)
def hybrid_search_fuse(query: Query, topk: int) -> list[Chunk]:
rrf = ReciprocalRankFusion()
return rrf.fuse([vector_search(query, topk), keyword_search(query, topk)])[:topk]
def hybrid_contextual_search_fuse(query: Query, topk: int) -> list[ContextualChunk]:
rrf = ReciprocalRankFusion()
return rrf.fuse(
[vector_contextual_search(query, topk), keyword_contextual_search(query, topk)]
)[:topk]
def hybrid_search_rerank(query: Query, topk: int, boost=3) -> list[Chunk]:
ranker = CohereReranker()
vecs = vector_search(query, topk * boost)
keys = keyword_search(query, topk * boost)
chunks = list({chunk.uid: chunk for chunk in vecs + keys}.values())
indices = ranker.rerank(query.content, [chunk.content for chunk in chunks])
return [chunks[i] for i in indices[:topk]]
def hybrid_contextual_search_rerank(
query: Query, topk: int, boost=3
) -> list[ContextualChunk]:
ranker = CohereReranker()
vecs = vector_contextual_search(query, topk * boost)
keys = keyword_contextual_search(query, topk * boost)
chunks = list({chunk.uid: chunk for chunk in vecs + keys}.values())
indices = ranker.rerank(
query.content, [f"{chunk.content}\n{chunk.context}" for chunk in chunks]
)
return [chunks[i] for i in indices[:topk]]
def evaluate(topk=5, search_func=vector_search):
print(f"TopK={topk}, search by: {search_func.__name__}")
queries: list[Query] = vr.select_by(Query.partial_init())
total_score = 0
start = perf_counter()
for query in queries:
chunks: list[Chunk] = search_func(query, topk)
count = 0
for doc_uuid, chunk_index in zip(
query.doc_uuids, query.chunk_index, strict=True
):
for chunk in chunks:
if chunk.doc_uuid == doc_uuid and chunk.index == chunk_index:
count += 1
break
score = count / len(query.doc_uuids)
total_score += score
print(
f"Pass@{topk}: {total_score / len(queries):.4f}, total queries: {len(queries)}, QPS: {len(queries) / (perf_counter() - start):.3f}"
)
if __name__ == "__main__":
Path("datasets").mkdir(parents=True, exist_ok=True)
download_data(
"https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/codebase_chunks.json",
"datasets/codebase_chunks.json",
)
download_data(
"https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/evaluation_set.jsonl",
"datasets/evaluation_set.jsonl",
)
load_data("datasets/codebase_chunks.json")
load_query("datasets/evaluation_set.jsonl")
load_contextual_chunks("datasets/codebase_chunks.json")
for topk in [5, 10]:
print("=" * 50)
evaluate(topk=topk, search_func=vector_search)
evaluate(topk=topk, search_func=keyword_search)
evaluate(topk=topk, search_func=hybrid_search_fuse)
evaluate(topk=topk, search_func=hybrid_search_rerank)
evaluate(topk=topk, search_func=vector_contextual_search)
evaluate(topk=topk, search_func=keyword_contextual_search)
evaluate(topk=topk, search_func=hybrid_contextual_search_fuse)
evaluate(topk=topk, search_func=hybrid_contextual_search_rerank)
Evaluate with generated queries¶
from dataclasses import dataclass
from typing import Annotated
import httpx
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import GeminiEvaluator
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
URL = "https://paulgraham.com/{}.html"
ARTICLE = "best"
TOP_K = 10
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
evaluator = GeminiEvaluator()
extractor = SimpleExtractor()
class Chunk(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
text: str
vector: DenseVector
class Query(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
cid: Annotated[int, ForeignKey[Chunk.uid]]
text: str
vector: DenseVector
@dataclass(frozen=True)
class Evaluation:
map: float
ndcg: float
recall: float
vr = VechordRegistry(ARTICLE, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Chunk, Query])
with httpx.Client() as client:
resp = client.get(URL.format(ARTICLE))
doc = extractor.extract_html(resp.text)
@vr.inject(output=Chunk)
def segment_essay() -> list[Chunk]:
chunker = RegexChunker()
chunks = chunker.segment(doc)
return [
Chunk(text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
for chunk in chunks
]
@vr.inject(input=Chunk, output=Query)
def create_query(uid: int, text: str) -> Query:
query = evaluator.produce_query(doc, text)
return Query(cid=uid, text=query, vector=DenseVector(emb.vectorize_chunk(query)))
@vr.inject(input=Query)
def evaluate(cid: int, vector: DenseVector) -> Evaluation:
chunks: list[Chunk] = vr.search_by_vector(Chunk, vector, topk=TOP_K)
score = evaluator.evaluate_one(str(cid), [str(chunk.uid) for chunk in chunks])
return Evaluation(
map=score["map"], ndcg=score["ndcg"], recall=score[f"recall_{TOP_K}"]
)
if __name__ == "__main__":
segment_essay()
create_query()
res: list[Evaluation] = evaluate()
print("ndcg", sum(r.ndcg for r in res) / len(res))
print(f"recall@{TOP_K}", sum(r.recall for r in res) / len(res))
Hybrid search with rerank¶
import httpx
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker
from vechord.spec import DefaultDocument, Keyword, create_chunk_with_dim
URL = "https://paulgraham.com/{}.html"
Chunk = create_chunk_with_dim(768)
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
reranker = CohereReranker()
extractor = SimpleExtractor()
vr = VechordRegistry("hybrid", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([DefaultDocument, Chunk])
@vr.inject(output=DefaultDocument)
def load_document(title: str) -> DefaultDocument:
with httpx.Client() as client:
resp = client.get(URL.format(title))
if resp.is_error:
raise RuntimeError(f"Failed to fetch the document `{title}`")
return DefaultDocument(title=title, text=extractor.extract_html(resp.text))
@vr.inject(input=DefaultDocument, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
chunks = chunker.segment(text)
return [
Chunk(
doc_id=uid,
text=chunk,
vec=emb.vectorize_chunk(chunk),
keyword=Keyword(chunk),
)
for chunk in chunks
]
def search_and_rerank(query: str, topk: int) -> list[Chunk]:
text_retrieves = vr.search_by_keyword(Chunk, query, topk=topk)
vec_retrievse = vr.search_by_vector(Chunk, emb.vectorize_query(query), topk=topk)
chunks = list(
{chunk.uid: chunk for chunk in text_retrieves + vec_retrievse}.values()
)
indices = reranker.rerank(query, [chunk.text for chunk in chunks])
return [chunks[i] for i in indices[:topk]]
if __name__ == "__main__":
load_document("smart")
chunk_document()
chunks = search_and_rerank("smart", 3)
print(chunks)