Cookbook 6: From Search to Insight
Chain search, clustering, anomaly detection, topic modeling, and temporal shift into one unified analytical pipeline — proving that vector data is a compute substrate, not just a retrieval index.
Answer
How do I run a compound workflow in EigenLake?
Start with a filtered search, cluster the results into natural families, detect anomalies within each family, model topics for human-readable labels, compare against a baseline window with temporal shift, and summarize the findings with a natural-language agent query — all over the same index, without copying data between systems.
Inputs
- A time-stamped corpus of consumer complaints with product, issue, and company metadata
- Embedding vectors that match the index dimension
- A business question that requires multiple analytical lenses
Outputs
- A complete triage report: clusters, anomalies, topics, temporal shifts, and recommended actions
- Structured evidence at every step: representative records, nearest neighbors, topic terms, and shift scores
- A natural-language summary artifact with actionable recommendations
This is the recipe that proves the thesis. Vector databases retrieve neighbors. EigenLake runs the full query path. You will start with a business question — "What changed in consumer complaints this week?" — and end with a structured report that a compliance officer can act on. Not six separate tools. One SDK. One index. One pipeline.
Problem
You are a fintech compliance analyst. It is Monday morning. You need to know:
- What complaints arrived in the last 14 days?
- What natural families do they fall into?
- Which complaints do not fit any family — potential new attack vectors?
- What human-readable names describe each family?
- How did this week compare to last week — what is emerging, what is shrinking?
- What should you tell the fraud team, the product team, and the legal team?
You have 10,000 complaints in EigenLake. You have 15 minutes before the stand-up. You do not have time to export to a notebook, stitch together Spark and pandas, or wait for a BI dashboard to refresh.
Prerequisites
- Python 3.10 or newer.
pip install eigenlake numpy datasets.- An API key from
https://api.eigenlake.dev. - Familiarity with the individual workloads. This recipe assumes you have read Cookbooks 1–5 or are comfortable with
search.nearest,search.cluster,search.anomalies,search.topics, andsearch.temporal_shift.
The recipe
Step 1 — Load and ingest the dataset
This recipe reuses the CFPB consumer-complaints dataset from Cookbook 1 for continuity, but it is self-contained.
from datasets import load_dataset
import numpy as np
import eigenlake
from eigenlake import schema as s
DIM = 384
# CFPB Consumer Complaints: https://huggingface.co/datasets/cfpb/consumer-complaints
ds = load_dataset("cfpb/consumer-complaints", split="train")
ds = ds.filter(lambda r: r["consumer_complaint_narrative"] is not None)
ds = ds.shuffle(seed=42).select(range(10_000))
def fake_embed(text: str) -> list[float]:
rng = np.random.default_rng(abs(hash(text)) % (2**32))
v = rng.standard_normal(DIM).astype("float32")
v /= np.linalg.norm(v) + 1e-12
return v.tolist()
embed = fake_embed # swap to sentence-transformers in production
schema, index_options = (
s.SchemaBuilder(additional_properties=False)
.add("complaint_id", s.string(required=True, filterable=True))
.add("product", s.string(filterable=True))
.add("sub_product", s.string(filterable=True))
.add("issue", s.string(filterable=True))
.add("company", s.string(filterable=True))
.add("state", s.string(filterable=True))
.add("consumer_complaint_narrative", s.string(filterable=False))
.add("date_received", s.datetime(filterable=True))
.build()
)
with eigenlake.connect(
url="https://api.eigenlake.dev",
api_key="<sk_sbx_your_api_key_here>",
) as client:
idx = client.indexes.create_or_get(
namespace="cookbook-06",
index="compliance-pipeline",
dimensions=DIM,
schema=schema,
index_options=index_options,
)
# Synthetic dates: spread 10,000 records across 8 weeks
import random
random.seed(42)
payload = []
for i, r in enumerate(ds):
days_offset = random.randint(0, 56)
date_str = f"2025-05-{1 + (days_offset % 30):02d}T00:00:00Z"
if days_offset >= 30:
date_str = f"2025-06-{1 + (days_offset % 30):02d}T00:00:00Z"
payload.append({
"properties": {
"complaint_id": str(r["complaint_id"]),
"product": r["product"] or "Unknown",
"sub_product": r["sub_product"] or "Unknown",
"issue": r["issue"] or "Unknown",
"company": r["company"] or "Unknown",
"state": r["state"] or "Unknown",
"consumer_complaint_narrative": r["consumer_complaint_narrative"],
"date_received": date_str,
},
"vector": embed(r["consumer_complaint_narrative"]),
})
result = idx.records.add_many(payload, on_error="continue")
print(f"inserted {len(result)} of {len(payload)} complaints")
Step 2 — SEARCH: Retrieve complaints from the last 14 days
query = "fraudulent unauthorized charge on my credit card"
query_vector = embed(query)
recent_complaints = idx.search.nearest(
vector=query_vector,
limit=1_000,
filter={
"$and": [
{"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
{"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
]
},
)
print(f"search returned {len(recent_complaints['vectors'])} complaints")
The search narrows the corpus to the relevant subset: recent complaints in the three products the compliance team monitors. The query vector orients the ranking toward fraud-language, but the filter guarantees that only the right products and dates are considered.
Step 3 — CLUSTER: Discover natural complaint families
clusters = idx.search.cluster(
filter={
"$and": [
{"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
{"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
]
},
limit=10_000,
algorithm="dbscan",
dbscan_min_samples=10,
distance_metric="cosine",
representatives_per_cluster=3,
)
print(f"found {len(clusters['clusters'])} clusters")
for c in clusters["clusters"]:
print(f" cluster {c['cluster_id']}: {c['count']} complaints")
for rep in c["representatives"][:2]:
print(f" - {rep['metadata']['issue']}")
DBSCAN discovers the natural shape of the data. In this corpus you might see five clusters: billing errors, fraud reports, login issues, fee disputes, and data-privacy concerns. The representatives give you enough evidence to label each cluster without reading thousands of records.
Step 4 — ANOMALY DETECTION: Find the outliers
anomalies = idx.search.anomalies(
filter={
"$and": [
{"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
{"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
]
},
limit=10_000,
n_neighbors=20,
top_n=15,
text_fields=["consumer_complaint_narrative"],
timeout=130.0,
)
print(f"found {len(anomalies['anomalies'])} anomalies")
for a in anomalies["anomalies"][:3]:
print(f" rank={a['rank']} score={a['score']:.4f} percentile={a['percentile']:.2f}")
Anomalies are the complaints that do not fit any cluster. They are locally isolated in the vector space — semantically distant from their nearest neighbors. A 99th-percentile anomaly might be a synthetic-identity fraud pattern that has not appeared before, or a new regulatory issue that existing complaint language does not cover.
Step 5 — TOPIC MODELING: Name the families
topics = idx.search.topics(
filter={
"$and": [
{"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
{"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
]
},
limit=10_000,
text_fields=["consumer_complaint_narrative"],
metadata_fields=["product", "company"],
min_topics=5,
max_topics=15,
label_mode="llm",
top_terms=8,
)
for t in topics["topics"]:
print(f"\n{t['label']:<45} count={t['count']}")
print(f" terms: {', '.join(term['term'] for term in t['terms'][:5])}")
Topic modeling gives human-readable names to the clusters. Where DBSCAN tells you "Cluster 3 has 1,247 members," topic modeling tells you "Cluster 3 is 'Unauthorized account access attempts via spoofed customer service numbers.'" The LLM label is a presentation layer; the underlying assignment is deterministic and reproducible.
Step 6 — TEMPORAL SHIFT: Compare this week to last week
shifts = idx.search.temporal_shift(
baseline={
"start": "2025-06-08T00:00:00Z",
"end": "2025-06-14T23:59:59Z",
},
current={
"start": "2025-06-15T00:00:00Z",
"end": "2025-06-21T23:59:59Z",
},
timestamp_field="date_received",
filter={"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
group_by=["product"],
limit_per_window=5_000,
min_relative_shift=0.20,
min_count_shift=5,
text_fields=["consumer_complaint_narrative"],
summary_mode="llm",
)
for shift in shifts["shifts"][:8]:
print(f"\n{shift['kind']:<12} {shift['direction']:<12} score={shift['score']:.4f}")
print(f" label: {shift['label']}")
print(f" explanation: {shift['explanation']}")
Temporal shift answers "what changed?" You might see that fraud reports grew 340% in credit cards, fee disputes shrank 45% in bank accounts, and a new theme — "synthetic identity verification bypass" — emerged in mortgages. The group_by=["product"] keeps each product's shifts independent, so a surge in credit-card fraud does not mask a subtle but important shift in mortgages.
Step 7 — AGENT QUERY: Summarize and recommend
# Build a context string from the pipeline results
context = (
f"This week's complaint analysis: "
f"{len(clusters['clusters'])} natural complaint families identified. "
f"{len(anomalies['anomalies'])} anomalous complaints suggest new attack vectors. "
f"Key temporal shifts: "
+ "; ".join(
f"{s['label']} ({s['kind']}, {s['direction']})"
for s in shifts["shifts"][:5]
)
)
report = idx.agent.query(
f"Summarize this week's complaint trends and recommend which teams to alert. {context}",
mode="auto",
)
print(report["action"])
print(report.get("summary", report.get("filter", "See direct API results above")))
The agent query takes the structured evidence from the previous six steps and can return a natural-language summary when that agent surface is available. In the current documented SDK, agent mode is guaranteed to return a filter or cluster action for simple queries, so the example defensively falls back to the returned action/filter if no summary field is present. For complex compound contexts like this one, the pipeline itself is the compute; the agent query serves as a natural-language interface to the final report.
What you have built: A compliance analyst can now ask "What changed this week?" and receive, in one pipeline:
- The relevant complaints (search)
- Their natural families (clustering)
- The outliers worth investigating (anomaly detection)
- Human-readable names for each family (topic modeling)
- Week-over-week trend signals (temporal shift)
- A summary with team recommendations (agent query)
What is happening, step by step
One index, six workloads. Every step reads from the same EigenLake index. There is no ETL pipeline, no Spark job, no CSV export, no IAM reconciliation between a vector database and an analytics warehouse. The data and the compute live in the same place.
Filter reuse. The date range and product filter are identical across steps 2–6. In production, define the filter once as a dict and pass it to each workload. This guarantees that every analytical lens operates on the same population.
Evidence chaining. The clusters from step 3 inform the anomaly detection in step 4 (records outside dense clusters are prime anomaly candidates). The topics from step 5 inform the temporal shift in step 6 (topic labels make shift reports readable). The shift signals inform the agent query in step 7 (the agent summarizes trends, not raw vectors).
Structured artifacts. Every workload returns structured data — cluster summaries, anomaly rankings, topic terms, shift scores — that the next workload or an downstream agent can consume. This is the "compute not retrieve" principle from the EigenRun blog post in action.
Variations
Biology pipeline. Replace complaints with protein structures: search for neighbors of a new receptor target, cluster by structural family, detect anomalous folds that do not match CATH templates, model topics to name the families, compare against last month's depositions to see which families are growing, and recommend cryo-EM validation priorities.
Industrial pipeline. Replace complaints with sensor traces: search for readings from units in degradation mode, cluster by failure signature, detect anomalous sensor spikes within each signature, model topics to name the failure modes, compare against last week's baseline to see which modes are emerging, and recommend maintenance scheduling.
Add custom Python between steps. Between clustering and anomaly detection, you might run a domain-specific classifier:
# After clustering, classify each cluster by severity
for c in clusters["clusters"]:
severity = classify_complaint_severity(c["representatives"])
if severity == "critical":
# Run tighter anomaly detection on this cluster
pass
Schedule as a cron job. Wrap the entire pipeline in a function and schedule it to run every Monday at 06:00. Write the final report to a Slack channel or a compliance dashboard.
Scale beyond 10k. Each individual workload respects the 10,000-record synchronous limit. For larger corpora, partition by product or state, run the pipeline per partition, and merge the reports. The pattern is the same; only the orchestration changes.
What to read next
- Cookbook 1: Semantic Search at Scale — the foundational search patterns this pipeline builds on.
- Cookbook 3: Anomaly Detection in Depth — deeper tuning guidance for the anomaly detection step.
- Cookbook 5: Temporal Shift in Depth — deeper guidance on window sizing, group_by strategies, and alert pipelines.
- EigenRun: How EigenLake Lets Agents Compute, Not Just Retrieve — the research-backed thesis that this recipe demonstrates in practice.
- Why We Call It Vector Intelligence, Not Vector Search — the philosophical foundation for running workloads where the data lives.