EigenLake
Cookbook
Cookbook/

Cookbook 6: From Search to Insight

Chain search, clustering, anomaly detection, topic modeling, and temporal shift into one unified analytical pipeline — proving that vector data is a compute substrate, not just a retrieval index.

cookbookpythoncompound-workflowpipelinecomplianceagent-query

Answer

How do I run a compound workflow in EigenLake?

Start with a filtered search, cluster the results into natural families, detect anomalies within each family, model topics for human-readable labels, compare against a baseline window with temporal shift, and summarize the findings with a natural-language agent query — all over the same index, without copying data between systems.

Workload
Vector Workloads
Difficulty
advanced
Estimated time
15 minutes

Inputs

  • A time-stamped corpus of consumer complaints with product, issue, and company metadata
  • Embedding vectors that match the index dimension
  • A business question that requires multiple analytical lenses

Outputs

  • A complete triage report: clusters, anomalies, topics, temporal shifts, and recommended actions
  • Structured evidence at every step: representative records, nearest neighbors, topic terms, and shift scores
  • A natural-language summary artifact with actionable recommendations

This is the recipe that proves the thesis. Vector databases retrieve neighbors. EigenLake runs the full query path. You will start with a business question — "What changed in consumer complaints this week?" — and end with a structured report that a compliance officer can act on. Not six separate tools. One SDK. One index. One pipeline.

Problem

You are a fintech compliance analyst. It is Monday morning. You need to know:

  1. What complaints arrived in the last 14 days?
  2. What natural families do they fall into?
  3. Which complaints do not fit any family — potential new attack vectors?
  4. What human-readable names describe each family?
  5. How did this week compare to last week — what is emerging, what is shrinking?
  6. What should you tell the fraud team, the product team, and the legal team?

You have 10,000 complaints in EigenLake. You have 15 minutes before the stand-up. You do not have time to export to a notebook, stitch together Spark and pandas, or wait for a BI dashboard to refresh.

Prerequisites

  • Python 3.10 or newer.
  • pip install eigenlake numpy datasets.
  • An API key from https://api.eigenlake.dev.
  • Familiarity with the individual workloads. This recipe assumes you have read Cookbooks 1–5 or are comfortable with search.nearest, search.cluster, search.anomalies, search.topics, and search.temporal_shift.

The recipe

Step 1 — Load and ingest the dataset

This recipe reuses the CFPB consumer-complaints dataset from Cookbook 1 for continuity, but it is self-contained.

from datasets import load_dataset
import numpy as np
import eigenlake
from eigenlake import schema as s

DIM = 384

# CFPB Consumer Complaints: https://huggingface.co/datasets/cfpb/consumer-complaints
ds = load_dataset("cfpb/consumer-complaints", split="train")
ds = ds.filter(lambda r: r["consumer_complaint_narrative"] is not None)
ds = ds.shuffle(seed=42).select(range(10_000))

def fake_embed(text: str) -> list[float]:
    rng = np.random.default_rng(abs(hash(text)) % (2**32))
    v = rng.standard_normal(DIM).astype("float32")
    v /= np.linalg.norm(v) + 1e-12
    return v.tolist()

embed = fake_embed  # swap to sentence-transformers in production

schema, index_options = (
    s.SchemaBuilder(additional_properties=False)
    .add("complaint_id", s.string(required=True, filterable=True))
    .add("product", s.string(filterable=True))
    .add("sub_product", s.string(filterable=True))
    .add("issue", s.string(filterable=True))
    .add("company", s.string(filterable=True))
    .add("state", s.string(filterable=True))
    .add("consumer_complaint_narrative", s.string(filterable=False))
    .add("date_received", s.datetime(filterable=True))
    .build()
)

with eigenlake.connect(
    url="https://api.eigenlake.dev",
    api_key="<sk_sbx_your_api_key_here>",
) as client:
    idx = client.indexes.create_or_get(
        namespace="cookbook-06",
        index="compliance-pipeline",
        dimensions=DIM,
        schema=schema,
        index_options=index_options,
    )

    # Synthetic dates: spread 10,000 records across 8 weeks
    import random
    random.seed(42)

    payload = []
    for i, r in enumerate(ds):
        days_offset = random.randint(0, 56)
        date_str = f"2025-05-{1 + (days_offset % 30):02d}T00:00:00Z"
        if days_offset >= 30:
            date_str = f"2025-06-{1 + (days_offset % 30):02d}T00:00:00Z"

        payload.append({
            "properties": {
                "complaint_id": str(r["complaint_id"]),
                "product": r["product"] or "Unknown",
                "sub_product": r["sub_product"] or "Unknown",
                "issue": r["issue"] or "Unknown",
                "company": r["company"] or "Unknown",
                "state": r["state"] or "Unknown",
                "consumer_complaint_narrative": r["consumer_complaint_narrative"],
                "date_received": date_str,
            },
            "vector": embed(r["consumer_complaint_narrative"]),
        })

    result = idx.records.add_many(payload, on_error="continue")
    print(f"inserted {len(result)} of {len(payload)} complaints")

Step 2 — SEARCH: Retrieve complaints from the last 14 days

    query = "fraudulent unauthorized charge on my credit card"
    query_vector = embed(query)

    recent_complaints = idx.search.nearest(
        vector=query_vector,
        limit=1_000,
        filter={
            "$and": [
                {"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
                {"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
            ]
        },
    )
    print(f"search returned {len(recent_complaints['vectors'])} complaints")

The search narrows the corpus to the relevant subset: recent complaints in the three products the compliance team monitors. The query vector orients the ranking toward fraud-language, but the filter guarantees that only the right products and dates are considered.

Step 3 — CLUSTER: Discover natural complaint families

    clusters = idx.search.cluster(
        filter={
            "$and": [
                {"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
                {"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
            ]
        },
        limit=10_000,
        algorithm="dbscan",
        dbscan_min_samples=10,
        distance_metric="cosine",
        representatives_per_cluster=3,
    )

    print(f"found {len(clusters['clusters'])} clusters")
    for c in clusters["clusters"]:
        print(f"  cluster {c['cluster_id']}: {c['count']} complaints")
        for rep in c["representatives"][:2]:
            print(f"    - {rep['metadata']['issue']}")

DBSCAN discovers the natural shape of the data. In this corpus you might see five clusters: billing errors, fraud reports, login issues, fee disputes, and data-privacy concerns. The representatives give you enough evidence to label each cluster without reading thousands of records.

Step 4 — ANOMALY DETECTION: Find the outliers

    anomalies = idx.search.anomalies(
        filter={
            "$and": [
                {"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
                {"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
            ]
        },
        limit=10_000,
        n_neighbors=20,
        top_n=15,
        text_fields=["consumer_complaint_narrative"],
        timeout=130.0,
    )

    print(f"found {len(anomalies['anomalies'])} anomalies")
    for a in anomalies["anomalies"][:3]:
        print(f"  rank={a['rank']} score={a['score']:.4f} percentile={a['percentile']:.2f}")

Anomalies are the complaints that do not fit any cluster. They are locally isolated in the vector space — semantically distant from their nearest neighbors. A 99th-percentile anomaly might be a synthetic-identity fraud pattern that has not appeared before, or a new regulatory issue that existing complaint language does not cover.

Step 5 — TOPIC MODELING: Name the families

    topics = idx.search.topics(
        filter={
            "$and": [
                {"date_received": {"$gte": "2025-06-15T00:00:00Z"}},
                {"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
            ]
        },
        limit=10_000,
        text_fields=["consumer_complaint_narrative"],
        metadata_fields=["product", "company"],
        min_topics=5,
        max_topics=15,
        label_mode="llm",
        top_terms=8,
    )

    for t in topics["topics"]:
        print(f"\n{t['label']:<45} count={t['count']}")
        print(f"  terms: {', '.join(term['term'] for term in t['terms'][:5])}")

Topic modeling gives human-readable names to the clusters. Where DBSCAN tells you "Cluster 3 has 1,247 members," topic modeling tells you "Cluster 3 is 'Unauthorized account access attempts via spoofed customer service numbers.'" The LLM label is a presentation layer; the underlying assignment is deterministic and reproducible.

Step 6 — TEMPORAL SHIFT: Compare this week to last week

    shifts = idx.search.temporal_shift(
        baseline={
            "start": "2025-06-08T00:00:00Z",
            "end": "2025-06-14T23:59:59Z",
        },
        current={
            "start": "2025-06-15T00:00:00Z",
            "end": "2025-06-21T23:59:59Z",
        },
        timestamp_field="date_received",
        filter={"product": {"$in": ["Credit card", "Bank account", "Mortgage"]}},
        group_by=["product"],
        limit_per_window=5_000,
        min_relative_shift=0.20,
        min_count_shift=5,
        text_fields=["consumer_complaint_narrative"],
        summary_mode="llm",
    )

    for shift in shifts["shifts"][:8]:
        print(f"\n{shift['kind']:<12} {shift['direction']:<12} score={shift['score']:.4f}")
        print(f"  label: {shift['label']}")
        print(f"  explanation: {shift['explanation']}")

Temporal shift answers "what changed?" You might see that fraud reports grew 340% in credit cards, fee disputes shrank 45% in bank accounts, and a new theme — "synthetic identity verification bypass" — emerged in mortgages. The group_by=["product"] keeps each product's shifts independent, so a surge in credit-card fraud does not mask a subtle but important shift in mortgages.

Step 7 — AGENT QUERY: Summarize and recommend

    # Build a context string from the pipeline results
    context = (
        f"This week's complaint analysis: "
        f"{len(clusters['clusters'])} natural complaint families identified. "
        f"{len(anomalies['anomalies'])} anomalous complaints suggest new attack vectors. "
        f"Key temporal shifts: "
        + "; ".join(
            f"{s['label']} ({s['kind']}, {s['direction']})"
            for s in shifts["shifts"][:5]
        )
    )

    report = idx.agent.query(
        f"Summarize this week's complaint trends and recommend which teams to alert. {context}",
        mode="auto",
    )
    print(report["action"])
    print(report.get("summary", report.get("filter", "See direct API results above")))

The agent query takes the structured evidence from the previous six steps and can return a natural-language summary when that agent surface is available. In the current documented SDK, agent mode is guaranteed to return a filter or cluster action for simple queries, so the example defensively falls back to the returned action/filter if no summary field is present. For complex compound contexts like this one, the pipeline itself is the compute; the agent query serves as a natural-language interface to the final report.

What you have built: A compliance analyst can now ask "What changed this week?" and receive, in one pipeline:

  • The relevant complaints (search)
  • Their natural families (clustering)
  • The outliers worth investigating (anomaly detection)
  • Human-readable names for each family (topic modeling)
  • Week-over-week trend signals (temporal shift)
  • A summary with team recommendations (agent query)

What is happening, step by step

One index, six workloads. Every step reads from the same EigenLake index. There is no ETL pipeline, no Spark job, no CSV export, no IAM reconciliation between a vector database and an analytics warehouse. The data and the compute live in the same place.

Filter reuse. The date range and product filter are identical across steps 2–6. In production, define the filter once as a dict and pass it to each workload. This guarantees that every analytical lens operates on the same population.

Evidence chaining. The clusters from step 3 inform the anomaly detection in step 4 (records outside dense clusters are prime anomaly candidates). The topics from step 5 inform the temporal shift in step 6 (topic labels make shift reports readable). The shift signals inform the agent query in step 7 (the agent summarizes trends, not raw vectors).

Structured artifacts. Every workload returns structured data — cluster summaries, anomaly rankings, topic terms, shift scores — that the next workload or an downstream agent can consume. This is the "compute not retrieve" principle from the EigenRun blog post in action.

Variations

Biology pipeline. Replace complaints with protein structures: search for neighbors of a new receptor target, cluster by structural family, detect anomalous folds that do not match CATH templates, model topics to name the families, compare against last month's depositions to see which families are growing, and recommend cryo-EM validation priorities.

Industrial pipeline. Replace complaints with sensor traces: search for readings from units in degradation mode, cluster by failure signature, detect anomalous sensor spikes within each signature, model topics to name the failure modes, compare against last week's baseline to see which modes are emerging, and recommend maintenance scheduling.

Add custom Python between steps. Between clustering and anomaly detection, you might run a domain-specific classifier:

# After clustering, classify each cluster by severity
for c in clusters["clusters"]:
    severity = classify_complaint_severity(c["representatives"])
    if severity == "critical":
        # Run tighter anomaly detection on this cluster
        pass

Schedule as a cron job. Wrap the entire pipeline in a function and schedule it to run every Monday at 06:00. Write the final report to a Slack channel or a compliance dashboard.

Scale beyond 10k. Each individual workload respects the 10,000-record synchronous limit. For larger corpora, partition by product or state, run the pipeline per partition, and merge the reports. The pattern is the same; only the orchestration changes.

What to read next

Related reading

Cookbook/Semantic Search

Cookbook 1: Semantic Search at Scale

Use filtered nearest-neighbour search, cursor pagination, and search-unit economics to retrieve relevant records from a corpus of tens of thousands of consumer complaints.

/8 minutes/intermediate
Read more
Cookbook/Clustering

Cookbook 2: Clustering in Depth

Discover natural groupings in large vector datasets with DBSCAN and k-means, from protein sequences to support tickets, without exporting data to a separate analytics stack.

/10 minutes/intermediate
Read more
Cookbook/Anomaly Detection

Cookbook 3: Anomaly Detection in Depth

Surface unusual sensor events, operational records, and outlier folds with Local Outlier Factor over vector embeddings — without pre-defining what normal looks like.

/10 minutes/intermediate
Read more