통합
Haystack
Thunderbit를 수집 파이프라인용 Haystack 커스텀 컴포넌트로 사용
Haystack 2.x 파이프라인은 타입 있는 컴포넌트를 조합합니다. /batch/distill 을 커스텀 컴포넌트로 감싸면 평소의 DocumentSplitter → DocumentEmbedder → DocumentWriter 체인에 Document 를 그대로 흘려보낼 수 있습니다.
설치
pip install haystack-ai httpx커스텀 컴포넌트
from haystack import component, Document
import httpx, time
API = "https://openapi.thunderbit.com/openapi/v1"
H = {"Authorization": "Bearer YOUR_API_KEY"}
@component
class ThunderbitFetcher:
@component.output_types(documents=list[Document])
def run(self, urls: list[str]) -> dict:
job = httpx.post(f"{API}/batch/distill",
headers=H,
json={"urls": urls,
"include": ["metadata"]}).json()
job_id = job["data"]["jobId"]
while True:
status = httpx.get(f"{API}/batch/{job_id}", headers=H).json()
if status["data"]["status"] in ("COMPLETED", "FAILED"):
break
time.sleep(2)
docs = [
Document(content=r["markdown"],
meta={"source": r["url"], **r.get("metadata", {})})
for r in status["data"]["results"] if r["status"] == "SUCCEEDED"
]
return {"documents": docs}파이프라인
from haystack import Pipeline
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
store = InMemoryDocumentStore()
p = Pipeline()
p.add_component("fetch", ThunderbitFetcher())
p.add_component("split", DocumentSplitter(split_by="word", split_length=300))
p.add_component("write", DocumentWriter(store))
p.connect("fetch.documents", "split.documents")
p.connect("split.documents", "write.documents")
p.run({"fetch": {"urls": ["https://docs.example.com"]}})