Examples¶
Simple demo¶
from typing import Optional
from vechord.embedding import GeminiDenseEmbedding
from vechord.registry import VechordRegistry
from vechord.spec import PrimaryKeyAutoIncrease, Table, Vector
DenseVector = Vector[768]
class Document(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
title: str = ""
text: str
vec: DenseVector
if __name__ == "__main__":
vr = VechordRegistry("simple", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document])
emb = GeminiDenseEmbedding()
# add a document
text = "my personal long note"
doc = Document(title="note", text=text, vec=DenseVector(emb.vectorize_chunk(text)))
vr.insert(doc)
# load
docs = vr.select_by(Document.partial_init(), limit=1)
print(docs)
# query
res = vr.search_by_vector(Document, emb.vectorize_query("note"), topk=1)
print(res)
BEIR evaluation¶
import csv
import zipfile
from pathlib import Path
from typing import Iterator
import httpx
import msgspec
import rich.progress
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import BaseEvaluator
from vechord.registry import VechordRegistry
from vechord.spec import Table, Vector
BASE_URL = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip"
DEFAULT_DATASET = "scifact"
TOP_K = 10
emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
def download_dataset(dataset: str, output: Path):
output.mkdir(parents=True, exist_ok=True)
zip = output / f"{dataset}.zip"
if not zip.is_file():
with (
zip.open("wb") as f,
httpx.stream("GET", BASE_URL.format(dataset)) as stream,
):
total = int(stream.headers["Content-Length"])
with rich.progress.Progress(
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.BarColumn(bar_width=None),
rich.progress.DownloadColumn(),
rich.progress.TransferSpeedColumn(),
) as progress:
download_task = progress.add_task("Download", total=total)
for chunk in stream.iter_bytes():
f.write(chunk)
progress.update(
download_task, completed=stream.num_bytes_downloaded
)
unzip_dir = output / dataset
if not unzip_dir.is_dir():
with zipfile.ZipFile(zip, "r") as f:
f.extractall(output)
return unzip_dir
class Corpus(Table):
uid: str
text: str
title: str
vector: DenseVector
class Query(Table):
uid: str
cid: str
text: str
vector: DenseVector
class Evaluation(msgspec.Struct):
map: float
ndcg: float
recall: float
vr = VechordRegistry(DEFAULT_DATASET, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Corpus, Query])
@vr.inject(output=Corpus)
def load_corpus(dataset: str, output: Path) -> Iterator[Corpus]:
file = output / dataset / "corpus.jsonl"
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
title = item.get("title", "")
text = item.get("text", "")
try:
vector = emb.vectorize_chunk(f"{title}\n{text}")
except Exception as e:
print(f"failed to vectorize {title}: {e}")
continue
yield Corpus(
uid=item["_id"],
text=text,
title=title,
vector=DenseVector(vector),
)
@vr.inject(output=Query)
def load_query(dataset: str, output: Path) -> Iterator[Query]:
file = output / dataset / "queries.jsonl"
truth = output / dataset / "qrels" / "test.tsv"
table = {}
with open(truth, "r") as f:
reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
next(reader) # skip header
for row in reader:
table[row[0]] = row[1]
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
uid = item["_id"]
if uid not in table:
continue
text = item.get("text", "")
yield Query(
uid=uid,
cid=table[uid],
text=text,
vector=DenseVector(emb.vectorize_query(text)),
)
@vr.inject(input=Query)
def evaluate(cid: str, vector: DenseVector) -> Evaluation:
docs: list[Corpus] = vr.search_by_vector(Corpus, vector, topk=TOP_K)
score = BaseEvaluator.evaluate_one(cid, [doc.uid for doc in docs])
return Evaluation(
map=score.get("map"),
ndcg=score.get("ndcg"),
recall=score.get(f"recall_{TOP_K}"),
)
if __name__ == "__main__":
save_dir = Path("datasets")
download_dataset(DEFAULT_DATASET, save_dir)
load_corpus(DEFAULT_DATASET, save_dir)
load_query(DEFAULT_DATASET, save_dir)
res: list[Evaluation] = evaluate()
print("ndcg", sum(r.ndcg for r in res) / len(res))
print("recall@10", sum(r.recall for r in res) / len(res))
HTTP web service¶
from datetime import datetime
from typing import Annotated
import httpx
import msgspec
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.service import create_web_app
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
extractor = SimpleExtractor()
class Document(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
title: str = ""
text: str
updated_at: datetime = msgspec.field(default_factory=datetime.now)
class Chunk(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
doc_id: Annotated[int, ForeignKey[Document.uid]]
text: str
vector: DenseVector
vr = VechordRegistry("http", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])
@vr.inject(output=Document)
def load_document(title: str) -> Document:
with httpx.Client() as client:
resp = client.get(URL.format(title))
if resp.is_error:
raise RuntimeError(f"Failed to fetch the document `{title}`")
return Document(title=title, text=extractor.extract_html(resp.text))
@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
chunks = chunker.segment(text)
return [
Chunk(doc_id=uid, text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
for chunk in chunks
]
if __name__ == "__main__":
# this pipeline will be used in the web app, or you can run it with `vr.run()`
vr.set_pipeline([load_document, chunk_document])
app = create_web_app(vr)
from wsgiref.simple_server import make_server
with make_server("", 8000, app) as server:
server.serve_forever()
Contextual chunk augmentation¶
from datetime import datetime
from typing import Annotated, Optional
from vechord import (
GeminiAugmenter,
GeminiDenseEmbedding,
GeminiEvaluator,
LocalLoader,
RegexChunker,
SimpleExtractor,
)
from vechord.registry import VechordRegistry
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
emb = GeminiDenseEmbedding()
DenseVector = Vector[768]
extractor = SimpleExtractor()
class Document(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
digest: str
filename: str
text: str
updated_at: datetime
class Chunk(Table, kw_only=True):
uid: Optional[PrimaryKeyAutoIncrease] = None
doc_uid: Annotated[int, ForeignKey[Document.uid]]
seq_id: int
text: str
vector: DenseVector
class ContextChunk(Table, kw_only=True):
chunk_uid: Annotated[int, ForeignKey[Chunk.uid]]
text: str
vector: DenseVector
vr = VechordRegistry("decorator", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk, ContextChunk])
@vr.inject(output=Document)
def load_from_dir(dirpath: str) -> list[Document]:
loader = LocalLoader(dirpath, include=[".pdf"])
return [
Document(
digest=doc.digest,
filename=doc.path,
text=extractor.extract(doc),
updated_at=doc.updated_at,
)
for doc in loader.load()
]
@vr.inject(input=Document, output=Chunk)
def split_document(uid: int, text: str) -> list[Chunk]:
chunker = RegexChunker(overlap=0)
chunks = chunker.segment(text)
return [
Chunk(
doc_uid=uid,
seq_id=i,
text=chunk,
vector=DenseVector(emb.vectorize_chunk(chunk)),
)
for i, chunk in enumerate(chunks)
]
@vr.inject(input=Document, output=ContextChunk)
def context_embedding(uid: int, text: str) -> list[ContextChunk]:
chunks: list[Chunk] = vr.select_by(
Chunk.partial_init(doc_uid=uid), fields=["uid", "text"]
)
augmentor = GeminiAugmenter()
augmentor.reset(text)
context_chunks = [
f"{context}\n{origin}"
for (context, origin) in zip(
augmentor.augment_context([c.text for c in chunks]),
[c.text for c in chunks],
strict=False,
)
]
return [
ContextChunk(
chunk_uid=chunk_uid,
text=augmented,
vector=DenseVector(emb.vectorize_chunk(augmented)),
)
for (chunk_uid, augmented) in zip(
[c.uid for c in chunks], context_chunks, strict=False
)
]
def query_chunk(query: str) -> list[Chunk]:
vector = emb.vectorize_query(query)
res: list[Chunk] = vr.search_by_vector(
Chunk,
vector,
topk=5,
)
return res
def query_context_chunk(query: str) -> list[ContextChunk]:
vector = emb.vectorize_query(query)
res: list[ContextChunk] = vr.search_by_vector(
ContextChunk,
vector,
topk=5,
)
return res
@vr.inject(input=Chunk)
def evaluate(uid: int, doc_uid: int, text: str):
evaluator = GeminiEvaluator()
doc: Document = vr.select_by(Document.partial_init(uid=doc_uid))[0]
query = evaluator.produce_query(doc.text, text)
retrieved = query_chunk(query)
score = evaluator.evaluate_one(str(uid), [str(r.uid) for r in retrieved])
return score
if __name__ == "__main__":
from rich import print
load_from_dir("./data")
split_document()
context_embedding()
chunks = query_chunk("vector search")
print(chunks)
scores = evaluate()
print(sum(scores) / len(scores))
chunks = query_context_chunk("vector search")
print(chunks)
vr.clear_storage()
Evaluate with generated queries¶
from typing import Annotated
import httpx
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import GeminiEvaluator
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.spec import (
ForeignKey,
PrimaryKeyAutoIncrease,
Table,
Vector,
)
URL = "https://paulgraham.com/{}.html"
ARTICLE = "best"
TOP_K = 10
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
evaluator = GeminiEvaluator()
extractor = SimpleExtractor()
class Chunk(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
text: str
vector: DenseVector
class Query(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
cid: Annotated[int, ForeignKey[Chunk.uid]]
text: str
vector: DenseVector
class Evaluation:
map: float
ndcg: float
recall: float
vr = VechordRegistry(ARTICLE, "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Chunk, Query])
with httpx.Client() as client:
resp = client.get(URL.format(ARTICLE))
doc = extractor.extract_html(resp.text)
@vr.inject(output=Chunk)
def segment_essay() -> list[Chunk]:
chunker = RegexChunker()
chunks = chunker.segment(doc)
return [
Chunk(text=chunk, vector=DenseVector(emb.vectorize_chunk(chunk)))
for chunk in chunks
]
@vr.inject(input=Chunk, output=Query)
def create_query(uid: int, text: str) -> Query:
query = evaluator.produce_query(doc, text)
return Query(cid=uid, text=query, vector=DenseVector(emb.vectorize_chunk(query)))
@vr.inject(input=Query)
def evaluate(cid: int, vector: DenseVector) -> Evaluation:
chunks: list[Chunk] = vr.search_by_vector(Chunk, vector, topk=TOP_K)
score = evaluator.evaluate_one(str(cid), [str(chunk.uid) for chunk in chunks])
return Evaluation(
map=score["map"], ndcg=score["ndcg"], recall=score[f"recall_{TOP_K}"]
)
if __name__ == "__main__":
segment_essay()
create_query()
res: list[Evaluation] = evaluate()
print("ndcg", sum(r.ndcg for r in res) / len(res))
print(f"recall@{TOP_K}", sum(r.recall for r in res) / len(res))
Hybrid search with rerank¶
from typing import Annotated
import httpx
from vechord.chunk import RegexChunker
from vechord.embedding import GeminiDenseEmbedding
from vechord.extract import SimpleExtractor
from vechord.registry import VechordRegistry
from vechord.rerank import CohereReranker
from vechord.spec import ForeignKey, Keyword, PrimaryKeyAutoIncrease, Table, Vector
URL = "https://paulgraham.com/{}.html"
DenseVector = Vector[768]
emb = GeminiDenseEmbedding()
chunker = RegexChunker(size=1024, overlap=0)
reranker = CohereReranker()
extractor = SimpleExtractor()
class Document(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
title: str = ""
text: str
class Chunk(Table, kw_only=True):
uid: PrimaryKeyAutoIncrease | None = None
doc_id: Annotated[int, ForeignKey[Document.uid]]
text: str
vector: DenseVector
keyword: Keyword
vr = VechordRegistry("hybrid", "postgresql://postgres:postgres@172.17.0.1:5432/")
vr.register([Document, Chunk])
@vr.inject(output=Document)
def load_document(title: str) -> Document:
with httpx.Client() as client:
resp = client.get(URL.format(title))
if resp.is_error:
raise RuntimeError(f"Failed to fetch the document `{title}`")
return Document(title=title, text=extractor.extract_html(resp.text))
@vr.inject(input=Document, output=Chunk)
def chunk_document(uid: int, text: str) -> list[Chunk]:
chunks = chunker.segment(text)
return [
Chunk(
doc_id=uid,
text=chunk,
vector=emb.vectorize_chunk(chunk),
keyword=Keyword(chunk),
)
for chunk in chunks
]
def search_and_rerank(query: str, topk: int) -> list[Chunk]:
text_retrieves = vr.search_by_keyword(Chunk, query, topk=topk)
vec_retrievse = vr.search_by_vector(Chunk, emb.vectorize_query(query), topk=topk)
chunks = list(
{chunk.uid: chunk for chunk in text_retrieves + vec_retrievse}.values()
)
indices = reranker.rerank(query, [chunk.text for chunk in chunks])
return [chunks[i] for i in indices[:topk]]
if __name__ == "__main__":
load_document("smart")
chunk_document()
chunks = search_and_rerank("smart", 3)
print(chunks)