Examples

Simple demo

from typing import Optional

from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.spec import PrimaryKeyAutoIncrease, Table, Vector

DenseVector = Vector[768]


class Document(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    title: str = ""
    text: str
    vec: DenseVector


if __name__ == "__main__":
    vr = VechordRegistry("simple", "postgresql://postgres:postgres@172.17.0.1:5432/")
    vr.register([Document])
    emb = GeminiDenseEmbedding()

    # add a document
    text = "my personal long note"
    doc = Document(title="note", text=text, vec=DenseVector(emb.vectorize_chunk(text)))
    vr.insert(doc)

    # load
    docs = vr.select_by(Document.partial_init(), limit=1)
    print(docs)

    # query
    res = vr.search_by_vector(Document, emb.vectorize_query("note"), topk=1)
    print(res)

BEIR evaluation

import csv
import zipfile
from pathlib import Path
from typing import Iterator

import httpx
import msgspec
import rich.progress

from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import BaseEvaluator
from vechord.registry import VechordRegistry
from vechord.spec import Table, Vector

BASE_URL = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip"
DEFAULT_DATASET = "scifact"
TOP_K = 10

emb = GeminiDenseEmbedding()
DenseVector = Vector[768]


def download_dataset(dataset: str, output: Path):
    output.mkdir(parents=True, exist_ok=True)
    zip = output / f"{dataset}.zip"
    if not zip.is_file():
        with (
            zip.open("wb") as f,
            httpx.stream("GET", BASE_URL.format(dataset)) as stream,
        ):
            total = int(stream.headers["Content-Length"])
            with rich.progress.Progress(
                "[progress.percentage]{task.percentage:>3.0f}%",
                rich.progress.BarColumn(bar_width=None),
                rich.progress.DownloadColumn(),
                rich.progress.TransferSpeedColumn(),
            ) as progress:
                download_task = progress.add_task("Download", total=total)
                for chunk in stream.iter_bytes():
                    f.write(chunk)
                    progress.update(
                        download_task, completed=stream.num_bytes_downloaded
                    )
    unzip_dir = output / dataset
    if not unzip_dir.is_dir():
        with zipfile.ZipFile(zip, "r") as f:
            f.extractall(output)
    return unzip_dir


class Corpus(Table):
    uid: str
    text: str
    title: str
    vector: DenseVector


class Query(Table):
    uid: str
    cid: str
    text: str
    vector: DenseVector


class Evaluation(msgspec.Struct):
    map: float
    ndcg: float
    recall: float


vr = VechordRegistry(DEFAULT_DATASET, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Corpus, Query])


@vr.inject(output=Corpus)
def load_corpus(dataset: str, output: Path) -> Iterator[Corpus]:
    file = output / dataset / "corpus.jsonl"
    decoder = msgspec.json.Decoder()
    with file.open("r") as f:
        for line in f:
            item = decoder.decode(line)
            title = item.get("title", "")
            text = item.get("text", "")
            try:
                vector = emb.vectorize_chunk(f"{title}\n{text}")
            except Exception as e:
                print(f"failed to vectorize {title}: {e}")
                continue
            yield Corpus(
                uid=item["_id"],
                text=text,
                title=title,
                vector=DenseVector(vector),
            )


@vr.inject(output=Query)
def load_query(dataset: str, output: Path) -> Iterator[Query]:
    file = output / dataset / "queries.jsonl"
    truth = output / dataset / "qrels" / "test.tsv"

    table = {}
    with open(truth, "r") as f:
        reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
        next(reader)  # skip header
        for row in reader:
            table[row[0]] = row[1]

    decoder = msgspec.json.Decoder()
    with file.open("r") as f:
        for line in f:
            item = decoder.decode(line)
            uid = item["_id"]
            if uid not in table:
                continue
            text = item.get("text", "")
            yield Query(
                uid=uid,
                cid=table[uid],
                text=text,
                vector=DenseVector(emb.vectorize_query(text)),
            )


@vr.inject(input=Query)
def evaluate(cid: str, vector: DenseVector) -> Evaluation:
    docs: list[Corpus] = vr.search_by_vector(Corpus, vector, topk=TOP_K)
    score = BaseEvaluator.evaluate_one(cid, [doc.uid for doc in docs])
    return Evaluation(
        map=score.get("map"),
        ndcg=score.get("ndcg"),
        recall=score.get(f"recall_{TOP_K}"),
    )


if __name__ == "__main__":
    save_dir = Path("datasets")
    download_dataset(DEFAULT_DATASET, save_dir)

    load_corpus(DEFAULT_DATASET, save_dir)
    load_query(DEFAULT_DATASET, save_dir)

    res: list[Evaluation] = evaluate()
    print("ndcg", sum(r.ndcg for r in res) / len(res))
    print("recall@10", sum(r.recall for r in res) / len(res))

HTTP web service

from datetime import datetime, timezone
from functools import partial
from typing import Annotated

import httpx
import msgspec

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.service import create_web_app
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
extractor = SimpleExtractor()


class Document(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    title: str = ""
    text: str
    updated_at: datetime = msgspec.field(
        default_factory=partial(datetime.now, timezone.utc)
    )


class Chunk(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    doc_id: Annotated[int, ForeignKey[Document.uid]]
    text: str
    vector: DenseVector


vr = VechordRegistry("http", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])


@vr.inject(output=Document)
def load_document(title: str) -> Document:
    with httpx.Client() as client:
        resp = client.get(URL.format(title))
        if resp.is_error:
            raise RuntimeError(f"Failed to fetch the document `{title}`")
        return Document(title=title, text=extractor.extract_html(resp.text))


@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
    chunks = chunker.segment(text)
    return [
        Chunk(doc_id=uid, text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
        for chunk in chunks
    ]


if __name__ == "__main__":
    # this pipeline will be used in the web app, or you can run it with `vr.run()`
    pipeline = vr.create_pipeline([load_document, chunk_document])
    app = create_web_app(vr, pipeline)

    from wsgiref.simple_server import make_server

    with make_server("", 8000, app) as server:
        server.serve_forever()

Contextual chunk augmentation

from datetime import datetime
from typing import Annotated, Optional

from vechord import (
    GeminiAugmenter,
    GeminiDenseEmbedding,
    GeminiEvaluator,
    LocalLoader,
    RegexChunker,
    SimpleExtractor,
)
from vechord.registry import VechordRegistry
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
extractor = SimpleExtractor()


class Document(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    digest: str
    filename: str
    text: str
    updated_at: datetime


class Chunk(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    doc_uid: Annotated[int, ForeignKey[Document.uid]]
    seq_id: int
    text: str
    vector: DenseVector


class ContextChunk(Table, kw_only=True):
    chunk_uid: Annotated[int, ForeignKey[Chunk.uid]]
    text: str
    vector: DenseVector


vr = VechordRegistry("decorator", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk, ContextChunk])


@vr.inject(output=Document)
def load_from_dir(dirpath: str) -> list[Document]:
    loader = LocalLoader(dirpath, include=[".pdf"])
    return [
        Document(
            digest=doc.digest,
            filename=doc.path,
            text=extractor.extract(doc),
            updated_at=doc.updated_at,
        )
        for doc in loader.load()
    ]


@vr.inject(input=Document, output=Chunk)
def split_document(uid: int, text: str) -> list[Chunk]:
    chunker = RegexChunker(overlap=0)
    chunks = chunker.segment(text)
    return [
        Chunk(
            doc_uid=uid,
            seq_id=i,
            text=chunk,
            vector=DenseVector(emb.vectorize_chunk(chunk)),
        )
        for i, chunk in enumerate(chunks)
    ]


@vr.inject(input=Document, output=ContextChunk)
def context_embedding(uid: int, text: str) -> list[ContextChunk]:
    chunks: list[Chunk] = vr.select_by(
        Chunk.partial_init(doc_uid=uid), fields=["uid", "text"]
    )
    augmentor = GeminiAugmenter()
    augmentor.reset(text)
    context_chunks = [
        f"{context}\n{origin}"
        for (context, origin) in zip(
            augmentor.augment_context([c.text for c in chunks]),
            [c.text for c in chunks],
            strict=False,
        )
    ]
    return [
        ContextChunk(
            chunk_uid=chunk_uid,
            text=augmented,
            vector=DenseVector(emb.vectorize_chunk(augmented)),
        )
        for (chunk_uid, augmented) in zip(
            [c.uid for c in chunks], context_chunks, strict=False
        )
    ]


def query_chunk(query: str) -> list[Chunk]:
    vector = emb.vectorize_query(query)
    res: list[Chunk] = vr.search_by_vector(
        Chunk,
        vector,
        topk=5,
    )
    return res


def query_context_chunk(query: str) -> list[ContextChunk]:
    vector = emb.vectorize_query(query)
    res: list[ContextChunk] = vr.search_by_vector(
        ContextChunk,
        vector,
        topk=5,
    )
    return res


@vr.inject(input=Chunk)
def evaluate(uid: int, doc_uid: int, text: str):
    evaluator = GeminiEvaluator()
    doc: Document = vr.select_by(Document.partial_init(uid=doc_uid))[0]
    query = evaluator.produce_query(doc.text, text)
    retrieved = query_chunk(query)
    score = evaluator.evaluate_one(str(uid), [str(r.uid) for r in retrieved])
    return score


if __name__ == "__main__":
    from rich import print

    load_from_dir("./data")
    split_document()
    context_embedding()

    chunks = query_chunk("vector search")
    print(chunks)

    scores = evaluate()
    print(sum(scores) / len(scores))

    context_chunks = query_context_chunk("vector search")
    print(context_chunks)

    vr.clear_storage()

Contextual retrieval with the Anthropic example

"""Anthropic Cookbook Contextual Embedding Example.

Data can be found from "https://github.com/anthropics/anthropic-cookbook".
"""

import json
from pathlib import Path
from time import perf_counter
from typing import Annotated, Optional

import httpx

from vechord.augment import GeminiAugmenter
from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker, ReciprocalRankFusion
from vechord.spec import (
    ForeignKey,
    Keyword,
    PrimaryKeyAutoIncrease,
    Table,
    UniqueIndex,
    Vector,
)

DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
vr = VechordRegistry("anthropic", "postgresql://postgres:postgres@172.17.0.1:5432/")


class Document(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    uuid: Annotated[str, UniqueIndex()]
    content: str


class Chunk(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    doc_uuid: Annotated[str, ForeignKey[Document.uuid]]
    index: int
    content: str
    vector: DenseVector
    keyword: Keyword


class ContextualChunk(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    doc_uuid: Annotated[str, ForeignKey[Document.uuid]]
    index: int
    content: str
    context: str
    vector: DenseVector
    keyword: Keyword


class Query(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    content: str
    answer: str
    doc_uuids: list[str]
    chunk_index: list[int]
    vector: DenseVector


vr.register([Document, Chunk, ContextualChunk, Query])


def download_data(url: str, save_path: str):
    if Path(save_path).is_file():
        print(f"{save_path} already exists, skip download.")
        return
    with httpx.stream("GET", url) as response, open(save_path, "wb") as f:
        for chunk in response.iter_bytes():
            f.write(chunk)


def load_data(filepath: str):
    with open(filepath, "r", encoding="utf-8") as f:
        docs = json.load(f)
        for doc in docs:
            vr.insert(
                Document(
                    uuid=doc["original_uuid"],
                    content=doc["content"],
                )
            )
            for chunk in doc["chunks"]:
                vr.insert(
                    Chunk(
                        doc_uuid=doc["original_uuid"],
                        index=chunk["original_index"],
                        content=chunk["content"],
                        vector=emb.vectorize_chunk(chunk["content"]),
                        keyword=Keyword(chunk["content"]),
                    )
                )


def load_contextual_chunks(filepath: str):
    augmenter = GeminiAugmenter()

    with open(filepath, "r", encoding="utf-8") as f:
        docs = json.load(f)
        for doc in docs:
            augmenter.reset(doc["content"])
            chunks = doc["chunks"]
            augments = augmenter.augment_context([chunk["content"] for chunk in chunks])
            if len(augments) != len(chunks):
                print(
                    f"augments length not match for uuid: {doc['original_uuid']}, {len(augments)} != {len(chunks)}"
                )
            for chunk, context in zip(chunks, augments, strict=False):
                contextual_content = f"{chunk['content']}\n\n{context}"
                vr.insert(
                    ContextualChunk(
                        doc_uuid=doc["original_uuid"],
                        index=chunk["original_index"],
                        content=chunk["content"],
                        context=context,
                        vector=emb.vectorize_chunk(contextual_content),
                        keyword=Keyword(contextual_content),
                    )
                )


def load_query(filepath: str):
    queries = []
    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            query = json.loads(line)
            queries.append(
                Query(
                    content=query["query"],
                    answer=query["answer"],
                    doc_uuids=[x[0] for x in query["golden_chunk_uuids"]],
                    chunk_index=[x[1] for x in query["golden_chunk_uuids"]],
                    vector=emb.vectorize_query(query["query"]),
                )
            )
    vr.copy_bulk(queries)


def vector_search(query: Query, topk: int) -> list[Chunk]:
    return vr.search_by_vector(Chunk, query.vector, topk=topk)


def vector_contextual_search(query: Query, topk: int) -> list[ContextualChunk]:
    return vr.search_by_vector(ContextualChunk, query.vector, topk=topk)


def keyword_search(query: Query, topk: int) -> list[Chunk]:
    return vr.search_by_keyword(Chunk, query.content, topk=topk)


def keyword_contextual_search(query: Query, topk: int) -> list[ContextualChunk]:
    return vr.search_by_keyword(ContextualChunk, query.content, topk=topk)


def hybrid_search_fuse(query: Query, topk: int) -> list[Chunk]:
    rrf = ReciprocalRankFusion()
    return rrf.fuse([vector_search(query, topk), keyword_search(query, topk)])[:topk]


def hybrid_contextual_search_fuse(query: Query, topk: int) -> list[ContextualChunk]:
    rrf = ReciprocalRankFusion()
    return rrf.fuse(
        [vector_contextual_search(query, topk), keyword_contextual_search(query, topk)]
    )[:topk]


def hybrid_search_rerank(query: Query, topk: int, boost=3) -> list[Chunk]:
    ranker = CohereReranker()
    vecs = vector_search(query, topk * boost)
    keys = keyword_search(query, topk * boost)
    chunks = list({chunk.uid: chunk for chunk in vecs + keys}.values())
    indices = ranker.rerank(query.content, [chunk.content for chunk in chunks])
    return [chunks[i] for i in indices[:topk]]


def hybrid_contextual_search_rerank(
    query: Query, topk: int, boost=3
) -> list[ContextualChunk]:
    ranker = CohereReranker()
    vecs = vector_contextual_search(query, topk * boost)
    keys = keyword_contextual_search(query, topk * boost)
    chunks = list({chunk.uid: chunk for chunk in vecs + keys}.values())
    indices = ranker.rerank(
        query.content, [f"{chunk.content}\n{chunk.context}" for chunk in chunks]
    )
    return [chunks[i] for i in indices[:topk]]


def evaluate(topk=5, search_func=vector_search):
    print(f"TopK={topk}, search by: {search_func.__name__}")
    queries: list[Query] = vr.select_by(Query.partial_init())
    total_score = 0
    start = perf_counter()
    for query in queries:
        chunks: list[Chunk] = search_func(query, topk)
        count = 0
        for doc_uuid, chunk_index in zip(
            query.doc_uuids, query.chunk_index, strict=True
        ):
            for chunk in chunks:
                if chunk.doc_uuid == doc_uuid and chunk.index == chunk_index:
                    count += 1
                    break
        score = count / len(query.doc_uuids)
        total_score += score

    print(
        f"Pass@{topk}: {total_score / len(queries):.4f}, total queries: {len(queries)}, QPS: {len(queries) / (perf_counter() - start):.3f}"
    )


if __name__ == "__main__":
    Path("datasets").mkdir(parents=True, exist_ok=True)
    download_data(
        "https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/codebase_chunks.json",
        "datasets/codebase_chunks.json",
    )
    download_data(
        "https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/evaluation_set.jsonl",
        "datasets/evaluation_set.jsonl",
    )
    load_data("datasets/codebase_chunks.json")
    load_query("datasets/evaluation_set.jsonl")
    load_contextual_chunks("datasets/codebase_chunks.json")

    for topk in [5, 10]:
        print("=" * 50)
        evaluate(topk=topk, search_func=vector_search)
        evaluate(topk=topk, search_func=keyword_search)
        evaluate(topk=topk, search_func=hybrid_search_fuse)
        evaluate(topk=topk, search_func=hybrid_search_rerank)
        evaluate(topk=topk, search_func=vector_contextual_search)
        evaluate(topk=topk, search_func=keyword_contextual_search)
        evaluate(topk=topk, search_func=hybrid_contextual_search_fuse)
        evaluate(topk=topk, search_func=hybrid_contextual_search_rerank)

Evaluate with generated queries

from dataclasses import dataclass
from typing import Annotated

import httpx

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import GeminiEvaluator
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

URL = "https://paulgraham.com/{}.html"
ARTICLE = "best"
TOP_K = 10

DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
evaluator = GeminiEvaluator()
extractor = SimpleExtractor()


class Chunk(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    text: str
    vector: DenseVector


class Query(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    cid: Annotated[int, ForeignKey[Chunk.uid]]
    text: str
    vector: DenseVector


@dataclass(frozen=True)
class Evaluation:
    map: float
    ndcg: float
    recall: float


vr = VechordRegistry(ARTICLE, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Chunk, Query])

with httpx.Client() as client:
    resp = client.get(URL.format(ARTICLE))
doc = extractor.extract_html(resp.text)


@vr.inject(output=Chunk)
def segment_essay() -> list[Chunk]:
    chunker = RegexChunker()
    chunks = chunker.segment(doc)
    return [
        Chunk(text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
        for chunk in chunks
    ]


@vr.inject(input=Chunk, output=Query)
def create_query(uid: int, text: str) -> Query:
    query = evaluator.produce_query(doc, text)
    return Query(cid=uid, text=query, vector=DenseVector(emb.vectorize_chunk(query)))


@vr.inject(input=Query)
def evaluate(cid: int, vector: DenseVector) -> Evaluation:
    chunks: list[Chunk] = vr.search_by_vector(Chunk, vector, topk=TOP_K)
    score = evaluator.evaluate_one(str(cid), [str(chunk.uid) for chunk in chunks])
    return Evaluation(
        map=score["map"], ndcg=score["ndcg"], recall=score[f"recall_{TOP_K}"]
    )


if __name__ == "__main__":
    segment_essay()
    create_query()

    res: list[Evaluation] = evaluate()
    print("ndcg", sum(r.ndcg for r in res) / len(res))
    print(f"recall@{TOP_K}", sum(r.recall for r in res) / len(res))

Hybrid search with rerank

import httpx

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker
from vechord.spec import DefaultDocument, Keyword, create_chunk_with_dim

URL = "https://paulgraham.com/{}.html"
Chunk = create_chunk_with_dim(768)
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
reranker = CohereReranker()
extractor = SimpleExtractor()


vr = VechordRegistry("hybrid", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([DefaultDocument, Chunk])


@vr.inject(output=DefaultDocument)
def load_document(title: str) -> DefaultDocument:
    with httpx.Client() as client:
        resp = client.get(URL.format(title))
        if resp.is_error:
            raise RuntimeError(f"Failed to fetch the document `{title}`")
        return DefaultDocument(title=title, text=extractor.extract_html(resp.text))


@vr.inject(input=DefaultDocument, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
    chunks = chunker.segment(text)
    return [
        Chunk(
            doc_id=uid,
            text=chunk,
            vec=emb.vectorize_chunk(chunk),
            keyword=Keyword(chunk),
        )
        for chunk in chunks
    ]


def search_and_rerank(query: str, topk: int) -> list[Chunk]:
    text_retrieves = vr.search_by_keyword(Chunk, query, topk=topk)
    vec_retrievse = vr.search_by_vector(Chunk, emb.vectorize_query(query), topk=topk)
    chunks = list(
        {chunk.uid: chunk for chunk in text_retrieves + vec_retrievse}.values()
    )
    indices = reranker.rerank(query, [chunk.text for chunk in chunks])
    return [chunks[i] for i in indices[:topk]]


if __name__ == "__main__":
    load_document("smart")
    chunk_document()
    chunks = search_and_rerank("smart", 3)
    print(chunks)