Integrations

Haystack

Use Thunderbit as a custom Haystack component for ingestion pipelines

Haystack 2.x pipelines compose typed components. Wrap /batch/distill as a custom component and pipe Documents into your usual DocumentSplitterDocumentEmbedderDocumentWriter chain.

Install

pip install haystack-ai httpx

Custom component

from haystack import component, Document
import httpx, time

API = "https://openapi.thunderbit.com/openapi/v1"
H = {"Authorization": "Bearer YOUR_API_KEY"}

@component
class ThunderbitFetcher:
    @component.output_types(documents=list[Document])
    def run(self, urls: list[str]) -> dict:
        job = httpx.post(f"{API}/batch/distill",
                         headers=H,
                         json={"urls": urls,
                               "include": ["metadata"]}).json()
        job_id = job["data"]["jobId"]

        while True:
            status = httpx.get(f"{API}/batch/{job_id}", headers=H).json()
            if status["data"]["status"] in ("COMPLETED", "FAILED"):
                break
            time.sleep(2)

        docs = [
            Document(content=r["markdown"],
                     meta={"source": r["url"], **r.get("metadata", {})})
            for r in status["data"]["results"] if r["status"] == "SUCCEEDED"
        ]
        return {"documents": docs}

Pipeline

from haystack import Pipeline
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore

store = InMemoryDocumentStore()

p = Pipeline()
p.add_component("fetch",  ThunderbitFetcher())
p.add_component("split",  DocumentSplitter(split_by="word", split_length=300))
p.add_component("write",  DocumentWriter(store))
p.connect("fetch.documents",  "split.documents")
p.connect("split.documents",  "write.documents")

p.run({"fetch": {"urls": ["https://docs.example.com"]}})