Integrations

Haystack

Use Thunderbit as a custom Haystack component for ingestion pipelines

Haystack 2.x pipelines compose typed components. Wrap /batch/distill as a custom component and pipe Documents into your usual DocumentSplitterDocumentEmbedderDocumentWriter chain.

Install

pip install haystack-ai httpx

Custom component

from haystack import component, Document
import httpx, time

API = "https://openapi.thunderbit.com/openapi/v1"
H = {"Authorization": "Bearer YOUR_API_KEY"}

@component
class ThunderbitFetcher:
    @component.output_types(documents=list[Document])
    def run(self, urls: list[str]) -> dict:
        job_id = httpx.post(f"{API}/batch/distill",
                            headers=H,
                            json={"urls": urls}).json()["data"]["id"]

        while True:
            data = httpx.get(f"{API}/batch/distill/{job_id}", headers=H).json()["data"]
            if data["status"] in ("COMPLETED", "FAILED", "CANCELLED"):
                break
            time.sleep(2)

        docs = [
            Document(content=r["markdown"], meta={"source": r["url"]})
            for r in data.get("results", []) if r["status"] == "SUCCEEDED"
        ]
        return {"documents": docs}

Pipeline

from haystack import Pipeline
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore

store = InMemoryDocumentStore()

p = Pipeline()
p.add_component("fetch",  ThunderbitFetcher())
p.add_component("split",  DocumentSplitter(split_by="word", split_length=300))
p.add_component("write",  DocumentWriter(store))
p.connect("fetch.documents",  "split.documents")
p.connect("split.documents",  "write.documents")

p.run({"fetch": {"urls": ["https://docs.example.com"]}})