Cookbook 3: Anomaly Detection in Depth
Surface unusual sensor events, operational records, and outlier folds with Local Outlier Factor over vector embeddings — without pre-defining what normal looks like.
Answer
How do I detect anomalies in EigenLake?
Filter a snapshot by operational mode, run Local Outlier Factor over the embeddings, and receive ranked anomalies with nearest-neighbor evidence for root-cause analysis.
Inputs
- A time-series of sensor readings from industrial equipment
- Embedding vectors that encode rolling-window statistics
- An EigenLake index with operational-mode metadata for baseline separation
Outputs
- Ranked anomalies with LOF scores and percentiles
- Nearest-neighbor evidence for each outlier
- Per-mode baseline detection (different normal for different operating conditions)
- A repeatable anomaly-detection workflow over one index
Anomaly detection is the workload that finds the records that do not belong. This recipe uses synthetic turbine sensor data modeled on the NASA CMAPSS dataset because the failure modes are well understood — bearing wear, temperature spikes, pressure drops — but the same primitives apply to financial transactions, network traffic, or protein folds that deviate from known topology templates.
Problem
You have 9,000 turbine sensor readings across six operational modes. "Normal" in cruise mode is different from "normal" in climb mode. You need to find the readings that are locally isolated — semantically unusual relative to their nearest neighbors within the same mode — and you need evidence (the three closest normal readings) to explain why each outlier is strange.
Prerequisites
- Python 3.10 or newer.
pip install eigenlake numpy pandas.- An API key from
https://api.eigenlake.dev.
The recipe
Step 1 — Generate synthetic sensor data
The NASA CMAPSS dataset is the canonical benchmark for turbofan degradation prediction. Because it requires Kaggle authentication, this recipe generates synthetic data that matches the CMAPSS schema and failure physics.
import numpy as np
import pandas as pd
import eigenlake
from eigenlake import schema as s
DIM = 128
np.random.seed(42)
N_UNITS = 50
N_CYCLES = 180
SENSOR_COUNT = 10
records = []
for unit_id in range(1, N_UNITS + 1):
# Each unit has a random degradation trajectory
eol = np.random.randint(140, 200) # end-of-life cycle
operational_mode = np.random.choice(["1", "2", "3", "4", "5", "6"])
for cycle in range(1, N_CYCLES + 1):
# Degradation factor increases as cycle approaches EOL
degradation = max(0, (cycle - eol * 0.7) / (eol * 0.3))
health = 1.0 - degradation + np.random.normal(0, 0.02)
# Sensor readings: baseline + degradation trend + noise
sensors = []
for s_i in range(SENSOR_COUNT):
baseline = 500.0 + s_i * 20.0
trend = degradation * (10.0 + s_i * 5.0)
noise = np.random.normal(0, 3.0)
sensors.append(baseline + trend + noise)
# Occasional anomaly: sudden spike or drop
is_anomaly = False
if np.random.random() < 0.005:
sensors[np.random.randint(0, SENSOR_COUNT)] += np.random.choice([-80, 80])
is_anomaly = True
# Embedding = normalized sensor vector
vec = np.array(sensors, dtype="float32")
vec /= np.linalg.norm(vec) + 1e-12
records.append({
"unit_id": f"ENG-{unit_id:03d}",
"cycle": cycle,
"operational_mode": operational_mode,
"sensors": sensors,
"health_score": float(health),
"is_anomaly": is_anomaly,
"vector": vec.tolist(),
})
df = pd.DataFrame(records)
print(f"generated {len(df)} sensor readings from {N_UNITS} units")
Step 2 — Define the schema
schema, index_options = (
s.SchemaBuilder(additional_properties=False)
.add("reading_id", s.string(required=True, filterable=True))
.add("unit_id", s.string(filterable=True))
.add("cycle", s.integer(filterable=True))
.add("operational_mode", s.string(filterable=True))
.add("health_score", s.number(filterable=True))
.add("sensor_summary", s.string(filterable=False))
.build()
)
sensor_summary is a human-readable string of the top sensors for evidence display. operational_mode is the critical filter field: different modes have different baselines, and you detect anomalies within each baseline separately.
Step 3 — Ingest records
with eigenlake.connect(
url="https://api.eigenlake.dev",
api_key="<sk_sbx_your_api_key_here>",
) as client:
idx = client.indexes.create_or_get(
namespace="cookbook-03",
index="sensor-anomalies",
dimensions=DIM,
schema=schema,
index_options=index_options,
)
payload = [
{
"id": f"{r['unit_id']}-{r['cycle']}",
"properties": {
"reading_id": f"{r['unit_id']}-{r['cycle']}",
"unit_id": r["unit_id"],
"cycle": r["cycle"],
"operational_mode": r["operational_mode"],
"health_score": round(r["health_score"], 4),
"sensor_summary": f"S1={r['sensors'][0]:.1f} S2={r['sensors'][1]:.1f} ...",
},
"vector": r["vector"],
}
for _, r in df.iterrows()
]
result = idx.records.add_many(payload, on_error="continue")
print(f"inserted {len(result)} of {len(payload)} readings")
Step 4 — Define normal per operational mode
Anomaly detection only works if "normal" is well-defined. In heterogeneous systems, normal varies by context. First, cluster by operational mode to establish local baselines:
modes = ["1", "2", "3", "4", "5", "6"]
mode_baselines = {}
for mode in modes:
clusters = idx.search.cluster(
filter={"operational_mode": {"$eq": mode}},
limit=2_000,
algorithm="dbscan",
dbscan_min_samples=10,
distance_metric="cosine",
)
mode_baselines[mode] = clusters["records_clustered"]
print(f"mode {mode}: {clusters['records_clustered']} baseline readings")
Step 5 — Detect anomalies per mode
for mode in modes:
anomalies = idx.search.anomalies(
filter={"operational_mode": {"$eq": mode}},
limit=2_000,
n_neighbors=15,
top_n=10,
text_fields=["sensor_summary"],
timeout=130.0,
)
print(f"\nmode {mode}: {len(anomalies['anomalies'])} anomalies")
for a in anomalies["anomalies"][:3]:
print(f" rank={a['rank']} score={a['score']:.4f} percentile={a['percentile']:.2f}")
for n in a["nearest_neighbors"]:
print(f" evidence: {n['uuid']} distance={n['distance']:.4f}")
n_neighbors=15 is a tight local neighborhood. Smaller values (5–10) make the score sensitive to fine-grained density variations — useful for early failure detection. Larger values (30–50) smooth over noise — useful for stable alerting.
text_fields provides human-readable evidence. The anomaly score is computed from the vector, but the sensor_summary text is included in the response so an engineer can read the evidence without fetching records separately.
percentile is relative to the analyzed snapshot. A 99th-percentile reading in mode 1 might be unremarkable in mode 3. Always interpret percentiles within their filtered context.
Agent path
Agent mode currently routes anomaly-language queries to a filter action, not directly to the anomaly workload. The pattern is:
agent_result = idx.agent.query(
"Find unusual sensor readings from engine units in operational mode 1"
)
print(agent_result["action"]) # "filter"
print(agent_result["filter"]) # {"operational_mode": {"$eq": "1"}}
# Apply the inferred filter, then run anomaly detection directly
anomalies = idx.search.anomalies(
filter=agent_result["filter"],
limit=2_000,
n_neighbors=15,
top_n=10,
text_fields=["sensor_summary"],
)
This two-step pattern — agent infers the context, direct API runs the workload — is the honest state of the SDK today. Future versions may route directly from natural language to search.anomalies().
What is happening, line by line
Local Outlier Factor (LOF). The algorithm compares the local density of each record to the local density of its k nearest neighbors. A record with substantially lower density than its neighbors gets a high LOF score. LOF is ideal for vector spaces because it respects the manifold geometry of the embedding.
Per-mode baselines. Running one global anomaly detection across all six operational modes would miss mode-specific degradation signatures and flag mode transitions as false positives. By filtering to one mode at a time, you establish a local baseline that reflects actual operating conditions.
Nearest-neighbor evidence. Every anomaly result includes the three closest normal records. An engineer reads the evidence and sees, for example, that the outlier has a temperature 40 degrees above its neighbors while pressure is normal. That localizes the fault to a cooling subsystem.
Variations
Tighten top_n for alert pipelines. In production you might only want the top 5 anomalies per shift, not the top 10. Set top_n=5 and wire the output to a PagerDuty webhook.
Raise n_neighbors for noisy sensors. If your embedding includes 50 sensors and many are noisy, increase n_neighbors to 30 or 50 to smooth over local variance.
Combine with temporal shift. After detecting anomalies today, compare today's anomaly distribution to last week's with search.temporal_shift. If the same sensor family is drifting upward week-over-week, you have a predictive signal, not just a snapshot outlier.
Write anomaly signals back to the index. Create a downstream record_type="anomaly_signal" index and write one record per detected anomaly with unit_id, cycle, score, and recommended_action. Other agents read this structured summary instead of re-running anomaly detection every time.
Use clustering to define "normal" first. For systems with dozens of operational regimes, run DBSCAN first to find dense normal regions, then run anomaly detection only on records that fall outside those dense regions. This two-pass approach reduces false positives in multi-modal systems.
What to read next
- Cookbook 4: Topic Modeling in Depth — discover themes in the same sensor logs or in multilingual customer feedback.
- Cookbook 5: Temporal Shift in Depth — compare anomaly distributions across weeks to detect emerging degradation trends.
- Cookbook 6: From Search to Insight — see how anomaly detection chains with clustering and temporal shift in a production pipeline.