Examples

Simple demo

from typing import Optional

from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.spec import PrimaryKeyAutoIncrease, Table, Vector

DenseVector = Vector[768]


class Document(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    title: str = ""
    text: str
    vec: DenseVector


if __name__ == "__main__":
    vr = VechordRegistry("simple", "postgresql://postgres:postgres@172.17.0.1:5432/")
    vr.register([Document])
    emb = GeminiDenseEmbedding()

    # add a document
    text = "my personal long note"
    doc = Document(title="note", text=text, vec=DenseVector(emb.vectorize_chunk(text)))
    vr.insert(doc)

    # load
    docs = vr.select_by(Document.partial_init(), limit=1)
    print(docs)

    # query
    res = vr.search_by_vector(Document, emb.vectorize_query("note"), topk=1)
    print(res)

BEIR evaluation

import csv
import zipfile
from pathlib import Path
from typing import Iterator

import httpx
import msgspec
import rich.progress

from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import BaseEvaluator
from vechord.registry import VechordRegistry
from vechord.spec import Table, Vector

BASE_URL = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip"
DEFAULT_DATASET = "scifact"
TOP_K = 10

emb = GeminiDenseEmbedding()
DenseVector = Vector[768]


def download_dataset(dataset: str, output: Path):
    output.mkdir(parents=True, exist_ok=True)
    zip = output / f"{dataset}.zip"
    if not zip.is_file():
        with (
            zip.open("wb") as f,
            httpx.stream("GET", BASE_URL.format(dataset)) as stream,
        ):
            total = int(stream.headers["Content-Length"])
            with rich.progress.Progress(
                "[progress.percentage]{task.percentage:>3.0f}%",
                rich.progress.BarColumn(bar_width=None),
                rich.progress.DownloadColumn(),
                rich.progress.TransferSpeedColumn(),
            ) as progress:
                download_task = progress.add_task("Download", total=total)
                for chunk in stream.iter_bytes():
                    f.write(chunk)
                    progress.update(
                        download_task, completed=stream.num_bytes_downloaded
                    )
    unzip_dir = output / dataset
    if not unzip_dir.is_dir():
        with zipfile.ZipFile(zip, "r") as f:
            f.extractall(output)
    return unzip_dir


class Corpus(Table):
    uid: str
    text: str
    title: str
    vector: DenseVector


class Query(Table):
    uid: str
    cid: str
    text: str
    vector: DenseVector


class Evaluation(msgspec.Struct):
    map: float
    ndcg: float
    recall: float


vr = VechordRegistry(DEFAULT_DATASET, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Corpus, Query])


@vr.inject(output=Corpus)
def load_corpus(dataset: str, output: Path) -> Iterator[Corpus]:
    file = output / dataset / "corpus.jsonl"
    decoder = msgspec.json.Decoder()
    with file.open("r") as f:
        for line in f:
            item = decoder.decode(line)
            title = item.get("title", "")
            text = item.get("text", "")
            try:
                vector = emb.vectorize_chunk(f"{title}\n{text}")
            except Exception as e:
                print(f"failed to vectorize {title}: {e}")
                continue
            yield Corpus(
                uid=item["_id"],
                text=text,
                title=title,
                vector=DenseVector(vector),
            )


@vr.inject(output=Query)
def load_query(dataset: str, output: Path) -> Iterator[Query]:
    file = output / dataset / "queries.jsonl"
    truth = output / dataset / "qrels" / "test.tsv"

    table = {}
    with open(truth, "r") as f:
        reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
        next(reader)  # skip header
        for row in reader:
            table[row[0]] = row[1]

    decoder = msgspec.json.Decoder()
    with file.open("r") as f:
        for line in f:
            item = decoder.decode(line)
            uid = item["_id"]
            if uid not in table:
                continue
            text = item.get("text", "")
            yield Query(
                uid=uid,
                cid=table[uid],
                text=text,
                vector=DenseVector(emb.vectorize_query(text)),
            )


@vr.inject(input=Query)
def evaluate(cid: str, vector: DenseVector) -> Evaluation:
    docs: list[Corpus] = vr.search_by_vector(Corpus, vector, topk=TOP_K)
    score = BaseEvaluator.evaluate_one(cid, [doc.uid for doc in docs])
    return Evaluation(
        map=score.get("map"),
        ndcg=score.get("ndcg"),
        recall=score.get(f"recall_{TOP_K}"),
    )


if __name__ == "__main__":
    save_dir = Path("datasets")
    download_dataset(DEFAULT_DATASET, save_dir)

    load_corpus(DEFAULT_DATASET, save_dir)
    load_query(DEFAULT_DATASET, save_dir)

    res: list[Evaluation] = evaluate()
    print("ndcg", sum(r.ndcg for r in res) / len(res))
    print("recall@10", sum(r.recall for r in res) / len(res))

HTTP web service

from datetime import datetime
from typing import Annotated

import httpx
import msgspec

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.service import create_web_app
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
extractor = SimpleExtractor()


class Document(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    title: str = ""
    text: str
    updated_at: datetime = msgspec.field(default_factory=datetime.now)


class Chunk(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    doc_id: Annotated[int, ForeignKey[Document.uid]]
    text: str
    vector: DenseVector


vr = VechordRegistry("http", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])


@vr.inject(output=Document)
def load_document(title: str) -> Document:
    with httpx.Client() as client:
        resp = client.get(URL.format(title))
        if resp.is_error:
            raise RuntimeError(f"Failed to fetch the document `{title}`")
        return Document(title=title, text=extractor.extract_html(resp.text))


@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
    chunks = chunker.segment(text)
    return [
        Chunk(doc_id=uid, text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
        for chunk in chunks
    ]


if __name__ == "__main__":
    # this pipeline will be used in the web app, or you can run it with `vr.run()`
    vr.set_pipeline([load_document, chunk_document])
    app = create_web_app(vr)

    from wsgiref.simple_server import make_server

    with make_server("", 8000, app) as server:
        server.serve_forever()

Contextual chunk augmentation

from datetime import datetime
from typing import Annotated, Optional

from vechord import (
    GeminiAugmenter,
    GeminiDenseEmbedding,
    GeminiEvaluator,
    LocalLoader,
    RegexChunker,
    SimpleExtractor,
)
from vechord.registry import VechordRegistry
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
extractor = SimpleExtractor()


class Document(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    digest: str
    filename: str
    text: str
    updated_at: datetime


class Chunk(Table, kw_only=True):
    uid: Optional[PrimaryKeyAutoIncrease] = None
    doc_uid: Annotated[int, ForeignKey[Document.uid]]
    seq_id: int
    text: str
    vector: DenseVector


class ContextChunk(Table, kw_only=True):
    chunk_uid: Annotated[int, ForeignKey[Chunk.uid]]
    text: str
    vector: DenseVector


vr = VechordRegistry("decorator", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk, ContextChunk])


@vr.inject(output=Document)
def load_from_dir(dirpath: str) -> list[Document]:
    loader = LocalLoader(dirpath, include=[".pdf"])
    return [
        Document(
            digest=doc.digest,
            filename=doc.path,
            text=extractor.extract(doc),
            updated_at=doc.updated_at,
        )
        for doc in loader.load()
    ]


@vr.inject(input=Document, output=Chunk)
def split_document(uid: int, text: str) -> list[Chunk]:
    chunker = RegexChunker(overlap=0)
    chunks = chunker.segment(text)
    return [
        Chunk(
            doc_uid=uid,
            seq_id=i,
            text=chunk,
            vector=DenseVector(emb.vectorize_chunk(chunk)),
        )
        for i, chunk in enumerate(chunks)
    ]


@vr.inject(input=Document, output=ContextChunk)
def context_embedding(uid: int, text: str) -> list[ContextChunk]:
    chunks: list[Chunk] = vr.select_by(
        Chunk.partial_init(doc_uid=uid), fields=["uid", "text"]
    )
    augmentor = GeminiAugmenter()
    augmentor.reset(text)
    context_chunks = [
        f"{context}\n{origin}"
        for (context, origin) in zip(
            augmentor.augment_context([c.text for c in chunks]),
            [c.text for c in chunks],
            strict=False,
        )
    ]
    return [
        ContextChunk(
            chunk_uid=chunk_uid,
            text=augmented,
            vector=DenseVector(emb.vectorize_chunk(augmented)),
        )
        for (chunk_uid, augmented) in zip(
            [c.uid for c in chunks], context_chunks, strict=False
        )
    ]


def query_chunk(query: str) -> list[Chunk]:
    vector = emb.vectorize_query(query)
    res: list[Chunk] = vr.search_by_vector(
        Chunk,
        vector,
        topk=5,
    )
    return res


def query_context_chunk(query: str) -> list[ContextChunk]:
    vector = emb.vectorize_query(query)
    res: list[ContextChunk] = vr.search_by_vector(
        ContextChunk,
        vector,
        topk=5,
    )
    return res


@vr.inject(input=Chunk)
def evaluate(uid: int, doc_uid: int, text: str):
    evaluator = GeminiEvaluator()
    doc: Document = vr.select_by(Document.partial_init(uid=doc_uid))[0]
    query = evaluator.produce_query(doc.text, text)
    retrieved = query_chunk(query)
    score = evaluator.evaluate_one(str(uid), [str(r.uid) for r in retrieved])
    return score


if __name__ == "__main__":
    from rich import print

    load_from_dir("./data")
    split_document()
    context_embedding()

    chunks = query_chunk("vector search")
    print(chunks)

    scores = evaluate()
    print(sum(scores) / len(scores))

    chunks = query_context_chunk("vector search")
    print(chunks)

    vr.clear_storage()

Evaluate with generated queries

from typing import Annotated

import httpx

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import GeminiEvaluator
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.spec import (
    ForeignKey,
    PrimaryKeyAutoIncrease,
    Table,
    Vector,
)

URL = "https://paulgraham.com/{}.html"
ARTICLE = "best"
TOP_K = 10

DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
evaluator = GeminiEvaluator()
extractor = SimpleExtractor()


class Chunk(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    text: str
    vector: DenseVector


class Query(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    cid: Annotated[int, ForeignKey[Chunk.uid]]
    text: str
    vector: DenseVector


class Evaluation:
    map: float
    ndcg: float
    recall: float


vr = VechordRegistry(ARTICLE, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Chunk, Query])

with httpx.Client() as client:
    resp = client.get(URL.format(ARTICLE))
doc = extractor.extract_html(resp.text)


@vr.inject(output=Chunk)
def segment_essay() -> list[Chunk]:
    chunker = RegexChunker()
    chunks = chunker.segment(doc)
    return [
        Chunk(text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
        for chunk in chunks
    ]


@vr.inject(input=Chunk, output=Query)
def create_query(uid: int, text: str) -> Query:
    query = evaluator.produce_query(doc, text)
    return Query(cid=uid, text=query, vector=DenseVector(emb.vectorize_chunk(query)))


@vr.inject(input=Query)
def evaluate(cid: int, vector: DenseVector) -> Evaluation:
    chunks: list[Chunk] = vr.search_by_vector(Chunk, vector, topk=TOP_K)
    score = evaluator.evaluate_one(str(cid), [str(chunk.uid) for chunk in chunks])
    return Evaluation(
        map=score["map"], ndcg=score["ndcg"], recall=score[f"recall_{TOP_K}"]
    )


if __name__ == "__main__":
    segment_essay()
    create_query()

    res: list[Evaluation] = evaluate()
    print("ndcg", sum(r.ndcg for r in res) / len(res))
    print(f"recall@{TOP_K}", sum(r.recall for r in res) / len(res))

Hybrid search with rerank

from typing import Annotated

import httpx

from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker
from vechord.spec import ForeignKey, Keyword, PrimaryKeyAutoIncrease, Table, Vector

URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
reranker = CohereReranker()
extractor = SimpleExtractor()


class Document(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    title: str = ""
    text: str


class Chunk(Table, kw_only=True):
    uid: PrimaryKeyAutoIncrease | None = None
    doc_id: Annotated[int, ForeignKey[Document.uid]]
    text: str
    vector: DenseVector
    keyword: Keyword


vr = VechordRegistry("hybrid", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])


@vr.inject(output=Document)
def load_document(title: str) -> Document:
    with httpx.Client() as client:
        resp = client.get(URL.format(title))
        if resp.is_error:
            raise RuntimeError(f"Failed to fetch the document `{title}`")
        return Document(title=title, text=extractor.extract_html(resp.text))


@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
    chunks = chunker.segment(text)
    return [
        Chunk(
            doc_id=uid,
            text=chunk,
            vector=emb.vectorize_chunk(chunk),
            keyword=Keyword(chunk),
        )
        for chunk in chunks
    ]


def search_and_rerank(query: str, topk: int) -> list[Chunk]:
    text_retrieves = vr.search_by_keyword(Chunk, query, topk=topk)
    vec_retrievse = vr.search_by_vector(Chunk, emb.vectorize_query(query), topk=topk)
    chunks = list(
        {chunk.uid: chunk for chunk in text_retrieves + vec_retrievse}.values()
    )
    indices = reranker.rerank(query, [chunk.text for chunk in chunks])
    return [chunks[i] for i in indices[:topk]]


if __name__ == "__main__":
    load_document("smart")
    chunk_document()
    chunks = search_and_rerank("smart", 3)
    print(chunks)