Назад към всички

airflow-dag-analyzer

// Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

$ git log --oneline --stat
stars:384
forks:73
updated:March 4, 2026
SKILL.mdreadonly
SKILL.md Frontmatter
nameairflow-dag-analyzer
descriptionAnalyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.
version1.0.0
categoryOrchestration
skill-idSK-DEA-002
allowed-toolsRead, Grep, Glob, Bash, WebFetch

Airflow DAG Analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability and performance.

Overview

This skill examines Apache Airflow DAG definitions to identify performance bottlenecks, reliability issues, and best practice violations. It provides recommendations for task dependency optimization, parallelism configuration, error handling, and resource management.

Capabilities

  • DAG structure analysis and validation - Parse and validate DAG structure
  • Task dependency optimization - Identify bottlenecks and suggest parallel execution
  • Parallelism and concurrency recommendations - Optimize pool and slot allocation
  • SLA and timeout configuration - Recommend appropriate timeouts and SLAs
  • Retry and failure handling patterns - Validate retry logic and alerting
  • Sensor optimization - Smart sensors, deferrable operators, reschedule mode
  • Resource pool allocation - Optimize pool usage and worker distribution
  • DAG scheduling optimization - Catchup, backfill, and schedule interval tuning
  • Cross-DAG dependency detection - Identify external dependencies and triggers

Input Schema

{
  "dagCode": {
    "type": "string",
    "description": "The Python DAG definition code",
    "required": true
  },
  "dagId": {
    "type": "string",
    "description": "The DAG identifier"
  },
  "executionHistory": {
    "type": "object",
    "description": "Historical execution metrics",
    "properties": {
      "runs": {
        "type": "array",
        "items": {
          "dagRunId": "string",
          "executionDate": "string",
          "duration": "number",
          "state": "string",
          "taskDurations": "object"
        }
      }
    }
  },
  "clusterConfig": {
    "type": "object",
    "properties": {
      "workerCount": "number",
      "executorType": "string",
      "poolConfigs": "object",
      "airflowVersion": "string"
    }
  },
  "analysisScope": {
    "type": "array",
    "items": {
      "type": "string",
      "enum": ["structure", "performance", "reliability", "resources", "security"]
    },
    "default": ["structure", "performance", "reliability"]
  }
}

Output Schema

{
  "validationResults": {
    "errors": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "error"
      }
    },
    "warnings": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "warning"
      }
    }
  },
  "optimizations": {
    "type": "array",
    "items": {
      "category": "string",
      "current": "string",
      "recommended": "string",
      "impact": "high|medium|low",
      "effort": "string",
      "codeChange": "string"
    }
  },
  "recommendedConfig": {
    "type": "object",
    "properties": {
      "poolSize": "number",
      "maxActiveRuns": "number",
      "concurrency": "number",
      "defaultRetries": "number",
      "executionTimeout": "string"
    }
  },
  "dependencyGraph": {
    "type": "object",
    "properties": {
      "nodes": "array",
      "edges": "array",
      "criticalPath": "array",
      "parallelGroups": "array"
    }
  },
  "metrics": {
    "taskCount": "number",
    "maxDepth": "number",
    "parallelizationRatio": "number",
    "estimatedDuration": "string"
  },
  "securityFindings": {
    "type": "array",
    "items": {
      "severity": "high|medium|low",
      "finding": "string",
      "recommendation": "string"
    }
  }
}

Usage Examples

Basic DAG Analysis

{
  "dagCode": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
  "dagId": "daily_etl_pipeline"
}

With Execution History

{
  "dagCode": "...",
  "dagId": "daily_etl_pipeline",
  "executionHistory": {
    "runs": [
      {
        "dagRunId": "manual__2024-01-15",
        "duration": 3600,
        "state": "success",
        "taskDurations": {
          "extract": 600,
          "transform": 1800,
          "load": 1200
        }
      }
    ]
  }
}

Full Analysis with Cluster Config

{
  "dagCode": "...",
  "dagId": "complex_ml_pipeline",
  "clusterConfig": {
    "workerCount": 8,
    "executorType": "KubernetesExecutor",
    "poolConfigs": {
      "default_pool": {"slots": 128},
      "ml_pool": {"slots": 32}
    },
    "airflowVersion": "2.8.0"
  },
  "analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}

Validation Rules

DAG Definition Rules

RuleSeverityDescription
DAG-001ErrorMissing DAG default_args
DAG-002ErrorInvalid schedule_interval
DAG-003WarningCatchup enabled for long-running DAG
DAG-004WarningNo email on failure configured
DAG-005InfoConsider using @dag decorator

Task Definition Rules

RuleSeverityDescription
TSK-001ErrorTask has no upstream or downstream
TSK-002WarningTask missing retries configuration
TSK-003WarningExecution timeout not set
TSK-004WarningPythonOperator with no pool
TSK-005InfoConsider TaskGroup for related tasks

Sensor Rules

RuleSeverityDescription
SEN-001WarningSensor in poke mode (use reschedule)
SEN-002WarningSensor missing timeout
SEN-003InfoConsider deferrable operator
SEN-004WarningExternal sensor without soft_fail

Security Rules

RuleSeverityDescription
SEC-001ErrorHardcoded credentials
SEC-002WarningUsing Variable.get without default
SEC-003WarningConnection ID not parameterized
SEC-004InfoConsider Secrets Backend

Optimization Patterns

Parallelization

# Before: Sequential execution
task1 >> task2 >> task3 >> task4

# After: Parallel execution where possible
task1 >> [task2, task3] >> task4

Sensor Optimization

# Before: Poke mode (blocks worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='poke'  # Bad
)

# After: Reschedule mode (releases worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='reschedule',  # Good
    poke_interval=300
)

# Best: Deferrable (Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    deferrable=True
)

TaskGroups

# Before: Flat task structure
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products

# After: TaskGroups for organization
with TaskGroup('orders') as orders_group:
    extract >> transform >> load

with TaskGroup('products') as products_group:
    extract >> transform >> load

Dynamic Task Mapping (Airflow 2.3+)

# Before: Static task generation
for i in range(10):
    PythonOperator(task_id=f'process_{i}', ...)

# After: Dynamic task mapping
@task
def process_item(item):
    return item * 2

process_item.expand(item=[1, 2, 3, 4, 5])

Configuration Recommendations

Default Args Template

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=1),
}

Pool Configuration

Workload TypeRecommended Pool Size
Heavy compute2-4 per worker
I/O bound8-16 per worker
API callsRate limit based
SensorsSeparate pool, high slots

Integration Points

MCP Server Integration

  • yangkyeongmo/mcp-server-apache-airflow - Airflow REST API integration
  • Dagster MCP - Alternative orchestration patterns
  • Prefect MCP - Modern orchestration comparison

Related Skills

  • dbt Project Analyzer (SK-DEA-003) - dbt operator optimization
  • Data Lineage Mapper (SK-DEA-010) - Task lineage extraction

Applicable Processes

  • ETL/ELT Pipeline (etl-elt-pipeline.js)
  • A/B Testing Pipeline (ab-testing-pipeline.js)
  • Pipeline Migration (pipeline-migration.js)
  • Data Quality Framework (data-quality-framework.js)

References

Version History

  • 1.0.0 - Initial release with Airflow 2.x support