Optimisez vos opérations avec l'automatisation Airflow

Ce workflow n8n facilite la gestion et le suivi des exécutions de DAGs dans Apache Airflow, permettant aux équipes de DevOps et d'intégration continue de gagner en efficacité. Grâce à une intégration fluide avec l'API d'Airflow, ce workflow permet de lancer, surveiller et gérer les états des DAGs tout en automatisant le traitement des erreurs. Les utilisateurs peuvent réduire les temps d'attente grâce à un système intelligent de gestion des délais et d'évaluation des statuts, garantissant ainsi une meilleure allocation des ressources et une réduction des coûts opérationnels.

100,132 vues
29,875 copies
Automatisation

Documentation Complète

📋 Optimisez vos opérations avec l'automatisation Airflow

💡 Description

Ce workflow n8n facilite la gestion et le suivi des exécutions de DAGs dans Apache Airflow, permettant aux équipes de DevOps et d'intégration continue de gagner en efficacité. Grâce à une intégration fluide avec l'API d'Airflow, ce workflow permet de lancer, surveiller et gérer les états des DAGs tout en automatisant le traitement des erreurs. Les utilisateurs peuvent réduire les temps d'attente grâce à un système intelligent de gestion des délais et d'évaluation des statuts, garantissant ainsi une meilleure allocation des ressources et une réduction des coûts opérationnels.

📈 Impact & ROI: En utilisant ce workflow, les organisations peuvent diminuer le temps consacré à la surveillance manuelle des tâches Airflow, améliorer la réactivité aux échecs de tâches, et in fine augmenter le rendement global grâce à une gestion plus efficace et automatisée.

🚀 Fonctionnalités Clés

  • ✅ Gestion automatisée des exécutions de DAGs - Améliorez votre efficacité opérationnelle
  • ✅ Surveillance proactive des statuts - Réduisez les interruptions non planifiées
  • ✅ Gestion intelligente du temps d'attente - Optimisez l'utilisation des ressources
  • ✅ Traitement automatique des erreurs - Assurez la continuité des opérations

📊 Architecture Technique

12
Nodes
10
Connexions
2
Services

🔌 Services Intégrés

Apache Airflown8n

🔧 Composition du Workflow

NodeTypeDescription
Airflow: dag_runhttpRequestRequête HTTP vers une API externe
Airflow: dag_run - statehttpRequestRequête HTTP vers une API externe
countcodeTraitement des données
dag run failstopAndErrorTraitement des données
if state == queuedifCondition logique pour router le flux
dag run wait too longstopAndErrorTraitement des données
Airflow: dag_run - get resulthttpRequestRequête HTTP vers une API externe
Switch: stateswitchTraitement des données
in dataexecuteWorkflowTriggerTraitement des données
WaitwaitTraitement des données
If count > wait_timeifCondition logique pour router le flux
airflow-apisetTraitement des données

📖 Guide d'Implémentation

  1. Import du workflow: Téléchargez le fichier JSON et importez-le dans votre instance n8n
  2. Configuration des credentials: Configurez les accès pour chaque service utilisé
  3. Personnalisation: Adaptez les paramètres selon vos besoins spécifiques
  4. Test: Exécutez le workflow en mode test pour vérifier le bon fonctionnement
  5. Activation: Activez le workflow pour une exécution automatique

🏷️ Tags

Airflown8nautomatisation

Structure JSON

Voir le code JSON complet
{
    "id": "Y5URlIlbX4HDzWKA",
    "meta": {
        "instanceId": "6ae0aa8b6c9099f1f8ed1265281802eab47aaf5b2845f317791e4ac7ee5b7279",
        "templateCredsSetupCompleted": true
    },
    "name": "airflow dag_run",
    "tags": [
        {
            "id": "YSelDQ3zfWB0LeVn",
            "name": "airflow",
            "createdAt": "2025-02-25T04:17:21.943Z",
            "updatedAt": "2025-02-25T04:17:21.943Z"
        }
    ],
    "nodes": [
        {
            "id": "0d4457ef-7a88-4755-8bd2-b0e8148f86f4",
            "name": "Airflow: dag_run",
            "type": "n8n-nodes-base.httpRequest",
            "position": [
                380,
                -40
            ],
            "parameters": {
                "url": "={{ $('airflow-api').item.json.prefix }}\/api\/v1\/dags\/{{ $('in data').item.json.dag_id }}\/dagRuns",
                "method": "POST",
                "options": [],
                "jsonBody": "={\n  \"conf\": {{ $('in data').item.json.conf }}\n}",
                "sendBody": true,
                "specifyBody": "json",
                "authentication": "genericCredentialType",
                "genericAuthType": "httpBasicAuth"
            },
            "credentials": {
                "httpBasicAuth": {
                    "id": "vTR4WWA7czRn2ULn",
                    "name": "Airflow"
                }
            },
            "typeVersion": 4.2
        },
        {
            "id": "acf477a0-aad5-402a-a5a0-543f3e277333",
            "name": "Airflow: dag_run - state",
            "type": "n8n-nodes-base.httpRequest",
            "position": [
                840,
                60
            ],
            "parameters": {
                "url": "={{ $('airflow-api').item.json.prefix }}\/api\/v1\/dags\/{{ $('in data').item.json.dag_id }}\/dagRuns\/{{ $('Airflow: dag_run').item.json.dag_run_id }}",
                "options": [],
                "authentication": "genericCredentialType",
                "genericAuthType": "httpBasicAuth"
            },
            "credentials": {
                "httpBasicAuth": {
                    "id": "vTR4WWA7czRn2ULn",
                    "name": "Airflow"
                }
            },
            "typeVersion": 4.2
        },
        {
            "id": "26982a6f-6281-4140-a05c-ea6f3f2c0cb2",
            "name": "count",
            "type": "n8n-nodes-base.code",
            "position": [
                1180,
                40
            ],
            "parameters": {
                "jsCode": "try {\n  $('count').first().json.count += 1\n  return {count:$('count').first().json.count};\n}\ncatch (error) {\n  return {count:1};\n}"
            },
            "typeVersion": 2
        },
        {
            "id": "613718f6-ba7e-415c-8e07-0123224e1ac6",
            "name": "dag run fail",
            "type": "n8n-nodes-base.stopAndError",
            "position": [
                1180,
                400
            ],
            "parameters": {
                "errorMessage": "dag run fail"
            },
            "typeVersion": 1
        },
        {
            "id": "66ba0e85-4608-418b-b27b-8cbc50f78319",
            "name": "if state == queued",
            "type": "n8n-nodes-base.if",
            "position": [
                520,
                60
            ],
            "parameters": {
                "options": [],
                "conditions": {
                    "options": {
                        "version": 2,
                        "leftValue": "",
                        "caseSensitive": true,
                        "typeValidation": "strict"
                    },
                    "combinator": "and",
                    "conditions": [
                        {
                            "id": "0ae67986-09c0-443d-9262-075bfe94ca2e",
                            "operator": {
                                "name": "filter.operator.equals",
                                "type": "string",
                                "operation": "equals"
                            },
                            "leftValue": "={{ $json.state }}",
                            "rightValue": "queued"
                        }
                    ]
                }
            },
            "typeVersion": 2.2
        },
        {
            "id": "92176e9a-0ea7-48b0-9ca0-ea4ea8442cee",
            "name": "dag run wait too long",
            "type": "n8n-nodes-base.stopAndError",
            "position": [
                1500,
                40
            ],
            "parameters": {
                "errorMessage": "dag run wait too long"
            },
            "typeVersion": 1
        },
        {
            "id": "6a05471f-232e-44d6-9dbb-583400537507",
            "name": "Airflow: dag_run - get result",
            "type": "n8n-nodes-base.httpRequest",
            "position": [
                1180,
                -140
            ],
            "parameters": {
                "url": "={{ $('airflow-api').item.json.prefix }}\/api\/v1\/dags\/{{ $('in data').item.json.dag_id }}\/dagRuns\/{{ $('Airflow: dag_run').item.json.dag_run_id }}\/taskInstances\/{{ $('in data').item.json.task_id }}\/xcomEntries\/return_value",
                "options": [],
                "authentication": "genericCredentialType",
                "genericAuthType": "httpBasicAuth"
            },
            "credentials": {
                "httpBasicAuth": {
                    "id": "vTR4WWA7czRn2ULn",
                    "name": "Airflow"
                }
            },
            "typeVersion": 4.2
        },
        {
            "id": "fb2211d5-cef2-4be2-b281-de315aa07093",
            "name": "Switch: state",
            "type": "n8n-nodes-base.switch",
            "position": [
                980,
                -40
            ],
            "parameters": {
                "rules": {
                    "values": [
                        {
                            "outputKey": "=success",
                            "conditions": {
                                "options": {
                                    "version": 2,
                                    "leftValue": "",
                                    "caseSensitive": true,
                                    "typeValidation": "strict"
                                },
                                "combinator": "and",
                                "conditions": [
                                    {
                                        "id": "4d4ecf8a-c02b-4722-9528-1919cdf9b83e",
                                        "operator": {
                                            "name": "filter.operator.equals",
                                            "type": "string",
                                            "operation": "equals"
                                        },
                                        "leftValue": "={{ $json.state }}",
                                        "rightValue": "success"
                                    }
                                ]
                            },
                            "renameOutput": true
                        },
                        {
                            "outputKey": "queued",
                            "conditions": {
                                "options": {
                                    "version": 2,
                                    "leftValue": "",
                                    "caseSensitive": true,
                                    "typeValidation": "strict"
                                },
                                "combinator": "and",
                                "conditions": [
                                    {
                                        "operator": {
                                            "type": "string",
                                            "operation": "equals"
                                        },
                                        "leftValue": "={{ $json.state }}",
                                        "rightValue": "queued"
                                    }
                                ]
                            },
                            "renameOutput": true
                        },
                        {
                            "outputKey": "running",
                            "conditions": {
                                "options": {
                                    "version": 2,
                                    "leftValue": "",
                                    "caseSensitive": true,
                                    "typeValidation": "strict"
                                },
                                "combinator": "and",
                                "conditions": [
                                    {
                                        "id": "fa5d8524-334a-4ab1-b543-bba75cafd0ec",
                                        "operator": {
                                            "name": "filter.operator.equals",
                                            "type": "string",
                                            "operation": "equals"
                                        },
                                        "leftValue": "={{ $json.state }}",
                                        "rightValue": "running"
                                    }
                                ]
                            },
                            "renameOutput": true
                        },
                        {
                            "outputKey": "failed",
                            "conditions": {
                                "options": {
                                    "version": 2,
                                    "leftValue": "",
                                    "caseSensitive": true,
                                    "typeValidation": "strict"
                                },
                                "combinator": "and",
                                "conditions": [
                                    {
                                        "id": "dd853677-c51c-4c06-8680-3c9d1829d6bd",
                                        "operator": {
                                            "name": "filter.operator.equals",
                                            "type": "string",
                                            "operation": "equals"
                                        },
                                        "leftValue": "={{ $json.state }}",
                                        "rightValue": "failed"
                                    }
                                ]
                            },
                            "renameOutput": true
                        }
                    ]
                },
                "options": {
                    "fallbackOutput": 3
                }
            },
            "typeVersion": 3.2
        },
        {
            "id": "5941496a-a64d-432c-9103-e7bcae4b8d67",
            "name": "in data",
            "type": "n8n-nodes-base.executeWorkflowTrigger",
            "position": [
                100,
                -40
            ],
            "parameters": {
                "workflowInputs": {
                    "values": [
                        {
                            "name": "dag_id"
                        },
                        {
                            "name": "task_id"
                        },
                        {
                            "name": "conf"
                        },
                        {
                            "name": "wait",
                            "type": "number"
                        },
                        {
                            "name": "wait_time",
                            "type": "number"
                        }
                    ]
                }
            },
            "typeVersion": 1.1
        },
        {
            "id": "e77fed4a-b61a-4126-8be3-43ef8a832cfe",
            "name": "Wait",
            "type": "n8n-nodes-base.wait",
            "position": [
                700,
                -40
            ],
            "webhookId": "3d164954-2926-4174-afc1-261dfdfa9e46",
            "parameters": {
                "amount": "={{ $('in data').item.json.hasOwnProperty('wait') ? $('in data').item.json.wait : 10 }}"
            },
            "typeVersion": 1.1
        },
        {
            "id": "8ffae842-4400-4667-85bb-50644ef10fc0",
            "name": "If count > wait_time",
            "type": "n8n-nodes-base.if",
            "position": [
                1320,
                140
            ],
            "parameters": {
                "options": [],
                "conditions": {
                    "options": {
                        "version": 2,
                        "leftValue": "",
                        "caseSensitive": true,
                        "typeValidation": "strict"
                    },
                    "combinator": "and",
                    "conditions": [
                        {
                            "id": "1829d538-5633-4f5c-ad1b-285b084b35ee",
                            "operator": {
                                "type": "number",
                                "operation": "gt"
                            },
                            "leftValue": "={{ $json.count }}",
                            "rightValue": "={{ $('in data').item.json.hasOwnProperty('wait_time') ? $('in data').item.json.wait_time : 12 }}"
                        }
                    ]
                }
            },
            "typeVersion": 2.2
        },
        {
            "id": "c49d4a1b-6f25-471b-9c21-d3defb709dda",
            "name": "airflow-api",
            "type": "n8n-nodes-base.set",
            "position": [
                240,
                60
            ],
            "parameters": {
                "options": [],
                "assignments": {
                    "assignments": [
                        {
                            "id": "40a5ab5b-dee1-44c4-910a-d6b85af75aed",
                            "name": "prefix",
                            "type": "string",
                            "value": "https:\/\/airflow.example.com"
                        }
                    ]
                }
            },
            "typeVersion": 3.4
        }
    ],
    "active": false,
    "pinData": {
        "in data": [
            {
                "json": {
                    "conf": "{\n  \"image\": \"nginx\",\n  \"tag\": \"latest\",\n  \"tag_requested\": 1000\n}",
                    "wait": 10,
                    "dag_id": "image_tag_related",
                    "task_id": "image_tag_related",
                    "wait_time": 18
                }
            }
        ]
    },
    "settings": {
        "executionOrder": "v1"
    },
    "versionId": "57fdbcfc-7950-4aff-9ac7-3c2a99a2c8c8",
    "connections": {
        "Wait": {
            "main": [
                [
                    {
                        "node": "Airflow: dag_run - state",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "count": {
            "main": [
                [
                    {
                        "node": "If count > wait_time",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "in data": {
            "main": [
                [
                    {
                        "node": "airflow-api",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "airflow-api": {
            "main": [
                [
                    {
                        "node": "Airflow: dag_run",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "Switch: state": {
            "main": [
                [
                    {
                        "node": "Airflow: dag_run - get result",
                        "type": "main",
                        "index": 0
                    }
                ],
                [
                    {
                        "node": "count",
                        "type": "main",
                        "index": 0
                    }
                ],
                [
                    {
                        "node": "count",
                        "type": "main",
                        "index": 0
                    }
                ],
                [
                    {
                        "node": "dag run fail",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "Airflow: dag_run": {
            "main": [
                [
                    {
                        "node": "if state == queued",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "if state == queued": {
            "main": [
                [
                    {
                        "node": "Wait",
                        "type": "main",
                        "index": 0
                    }
                ],
                [
                    {
                        "node": "dag run fail",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "If count > wait_time": {
            "main": [
                [
                    {
                        "node": "dag run wait too long",
                        "type": "main",
                        "index": 0
                    }
                ],
                [
                    {
                        "node": "Wait",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "Airflow: dag_run - state": {
            "main": [
                [
                    {
                        "node": "Switch: state",
                        "type": "main",
                        "index": 0
                    }
                ]
            ]
        },
        "Airflow: dag_run - get result": {
            "main": [
                []
            ]
        }
    }
}
                                

Workflows Similaires

Automatisez le Résumé de Vos Emails avec A.I. et Messagerie

Ce workflow n8n vous permet d'automatiser la gestion de vos emails en utilisant l'intelligence artificielle pour résume...

Automatisation de gestion des réunions Zoom et communication

Ce workflow est conçu pour automatiser le processus de planification et de gestion des réunions Zoom tout en assurant ...

Automatisez vos Tweets d'images humoristiques à 17h

Ce workflow n8n est conçu pour les professionnels des réseaux sociaux cherchant à automatiser leur contenu humoristiq...