Skip to content

BD157/data-pipeline-agent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Self-Healing Data Pipeline Agent

A minimal, fully local demonstration of an autonomous agent that detects pipeline failures, diagnoses the root cause, patches the code, and re-runs — all without human intervention.


Problem Statement

Data pipelines break in production. Engineers waste hours tracking down KeyError, TypeError, and ZeroDivisionError failures buried in stack traces. This project shows how a lightweight agent loop can close that feedback cycle automatically:

failure → diagnosis → fix → success

No Airflow. No Spark. No paid APIs. Pure Python.


Architecture

┌─────────────────────────────────────────────────────────┐
│                        main.py                          │
│                    (Orchestrator)                       │
└──────────────────────┬──────────────────────────────────┘
                       │
          ┌────────────▼────────────┐
          │      pipeline.py        │  ← Step 1: Load CSV
          │   (buggy transform)     │    Step 2: Transform  ← BUG HERE
          └────────────┬────────────┘    Step 3: Save output
                       │
                  ❌  FAIL
                       │
          ┌────────────▼────────────┐
          │       parser.py         │  Extracts:
          │    (Error Observer)     │  { error_type, message, line }
          └────────────┬────────────┘
                       │
          ┌────────────▼────────────┐
          │        agent.py         │  Ollama (LLM) if available
          │   (Diagnosis Engine)    │  else → rule-based fallback
          └────────────┬────────────┘
                       │ { root_cause, fix, code_patch }
          ┌────────────▼────────────┐
          │        fixer.py         │  Edits pipeline.py in-place
          │    (Code Patcher)       │  (find → replace)
          └────────────┬────────────┘
                       │
          ┌────────────▼────────────┐
          │      pipeline.py        │  Re-run with patched code
          │     (now fixed!)        │
          └────────────┬────────────┘
                       │
                  ✅  SUCCESS

How to Run

1 — Install dependencies

pip install pandas

2 — Run the agent

cd project/
python main.py

To reset the bug and run the full demo again:

python main.py --reset

Optional: Enable LLM Diagnosis via Ollama

If you have Ollama installed:

ollama pull mistral
ollama serve          # runs on localhost:11434

Then re-run python main.py. The agent will automatically detect Ollama and query the LLM instead of using rule-based logic.


Demo Walkthrough

Stage What happens
RUN 1 pipeline.py tries to access column price which doesn't exist → KeyError
Observer Captures the full traceback, extracts error_type and message
Agent Maps KeyError → "missing column" → generates a code patch
Fixer Rewrites the offending line in pipeline.py to use value with null-safety
RUN 2 Patched pipeline runs successfully → output.csv written
Evaluator Prints ✅ FIX SUCCESSFUL

Failure Cases Covered

Error Root Cause Agent Fix
KeyError: 'price' Column doesn't exist Replace with value, add fillna(0)
ZeroDivisionError Zero in denominator Guard with lambda check
TypeError Null in arithmetic Add fillna(0) before operation

File Structure

project/
├── main.py        # Orchestrator — runs the full agent loop
├── pipeline.py    # Data pipeline (intentionally buggy)
├── parser.py      # Error observer — extracts structured error info
├── agent.py       # Diagnosis engine — LLM or rule-based
├── fixer.py       # Code patcher — edits pipeline.py automatically
├── data/
│   ├── sample.csv   # Input data
│   └── output.csv   # Written on success
└── README.md

Extending This

  • Add Airflow to schedule and monitor real pipelines
  • Add more rules in agent.py for FileNotFoundError, schema drift, etc.
  • Swap Ollama for any OpenAI-compatible endpoint by changing OLLAMA_URL
  • Store fix history in SQLite to build a knowledge base of past repairs

About

A self-healing data pipeline agent that detects, diagnoses, and automatically patches runtime errors using rule-based logic or a local LLM (Ollama/Mistral).

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages