Création d'un tableau de bord de surveillance de l'espace disque

#CCES#FastAPI#Générer du code avec IA#Monitoring#Pytest#Python

par Maxime Decooman

Introduction

Un manque d'espace disque peut faire s'effondrer votre système au pire moment. Un simple tableau de bord de surveillance peut vous sauver de ce chaos.

Dans ce tutoriel, je vais vous guider étape par étape dans la construction d'un tableau de bord de surveillance de l'espace disque en temps réel avec Python, en utilisant FastAPI pour le backend, Chart.js pour les visuels, et SSH pour un accès sécurisé au serveur.

Cette approche fournit une alternative légère aux solutions de surveillance plus lourdes tout en vous donnant un contrôle total sur l'implémentation.

Prérequis

Avant de commencer, voici ce dont vous aurez besoin (ne vous inquiétez pas s'il vous en manque quelques-uns - allez-y quand même !) :

Ce dont vous avez besoin Pourquoi c'est important
Python 3.13+ Met à jour le backend avec les dernières fonctionnalités, bien que les versions antérieures puissent fonctionner.
Serveur Linux avec accès SSH Fournit le système à surveiller et une connexion sécurisée pour la récupération des données.
Authentification par clé privée SSH Garantit un accès sécurisé et automatisé au serveur sans mot de passe.
Commandes de base du système de fichiers Linux (df, du, find, iostat) Permet de comprendre et d'interagir avec les mesures d'utilisation du disque du serveur.
Familiarité avec Python et la programmation asynchrone Permet de suivre efficacement le code asynchrone de FastAPI.
Compréhension des principes fondamentaux de FastAPI Indispensable pour construire et personnaliser l'épine dorsale API du tableau de bord.
Les bases du HTML, CSS, et JavaScript Nécessaire pour créer et styliser l'interface utilisateur.
Un éditeur de code (par exemple, PyCharm, VSCode) Votre outil pour écrire et déboguer le code - n'importe quel éditeur fonctionne.
Accès au terminal Permet d'exécuter des commandes, de configurer l'environnement et de lancer l'application.

À la fin de cet article, vous aurez un tableau de bord fonctionnel qui affichera :

  • Utilisation de l'espace disque
  • Mesures de consommation d'inodes (nœud d'index)
  • Identification des fichiers volumineux
  • Statistiques d'E/S sur le disque

Bar charts showing disk usage in real time

C'est parti!

Aperçu de l'architecture

La solution utilise une architecture client-serveur avec les composants suivants :

  1. Backend (FastAPI) : Traite les demandes d'API, se connecte aux serveurs via SSH, exécute les commandes Linux et analyse les résultats.
  2. Frontend (HTML/JS/Chart.js) : Affiche des visualisations des mesures sous forme de graphiques et de tableaux.
  3. Collecte de données : Utilise la bibliothèque paramiko pour établir des connexions SSH et exécuter des commandes sur des serveurs distants.
  4. Data Flow :
  • Le navigateur fait des requêtes API au backend FastAPI
  • Le backend se connecte au serveur via SSH, exécute les commandes
  • Les résultats sont analysés et renvoyés sous forme de JSON
  • Le frontend affiche les données avec Chart.js

Cette architecture est légère mais suffisante pour démarrer, ne nécessitant pas l'installation d'agents sur les serveurs surveillés. La contrepartie est qu'elle nécessite un accès SSH et crée une nouvelle connexion pour chaque requête, ce qui ne permet pas de surveiller de nombreux serveurs simultanément ou d'ajouter de nouvelles fonctionnalités.

Prenez votre café, c'est l'heure du codage !

Environnement de dev

Démarrer un environnement virtuel, je suppose que vous connaissez les bases mais si vous êtes nouveau dans les environnements virtuels, consultez [la doc officielle de Python] (https://docs.python.org/fr/3.13/library/venv.html) pour un guide d'installation rapide.

mkdir monitoring 
cd monitoring
python -m venv .venv
source .venv/bin/activate

Installer les paquets Python requis. Créez un fichier requirements.txt avec ce qui suit :

# requirements.txt
fastapi[standard]
paramiko
pytest
python-decouple

Puis installez-les.

pip install --upgrade pip
pip install -r requirements.txt

python-decouple est utilisé pour charger les variables .env (j'aime beaucoup ce paquet)

Structure du projet

ops_tools_examples/
├── .venv/  # The virtual environment 
├── dashboard/   # The actual app home folder   ├── static/
│      ├── js/
│         └── dashboard.js
│   ├── templates/
│      └── index.html
│   ├── tests/
│      ├── __init__.py
│      └── test_app.py
│   ├── __init__.py
│   └── main.py
├── .env
├── .gitignore
└── requirements.txt

Maintenant que nous avons configuré l'environnement, plongeons dans la mise en place de la colonne vertébrale de notre système de surveillance !

Création de l'application principale

FastAPI permet de créer une API qui servira les données pour l'affichage:

# dashbord/main.py
from pathlib import Path

from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import paramiko
from typing import List

from decouple import AutoConfig
from pydantic.dataclasses import dataclass

BASE_DIR: Path = Path(__file__).resolve().parent
config = AutoConfig(search_path=BASE_DIR)

app = FastAPI(title="Linux Disk Space Monitor")
app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static")

templates = Jinja2Templates(directory="templates")

SERVER_CONFIG = {
    "hostname": config("HOSTNAME"),
    "port": config("PORT", default=22, cast=int),
    "username": config("USERNAME"),
}

Obtenir la clé privée

key_type_mapping = {
    'ed25519': paramiko.Ed25519Key,
    'rsa': paramiko.RSAKey,
    'ecdsa': paramiko.ECDSAKey,
    'dss': paramiko.DSSKey
}

# Charge la clé privée avec les informations de .env
private_key_str = config("PRIVATE_KEY")
passphrase = config("PASSPHRASE", default=None)
key_type = config("PRIVATE_KEY_TYPE", default="ed25519").lower()
key_class = key_type_mapping.get(key_type)

Créer les modèles de données

FastAPI utilise pydantic pour créer les classes de données. Cela va aider à la validation, au support de l'IDE et à la documentation de l'API.

@dataclass
class DiskSpace:
    filesystem: str
    size: str
    used: str
    available: str
    use_percent: str
    mounted_on: str

@dataclass
class InodeUsage:
    filesystem: str
    inodes: str
    iused: str
    ifree: str
    iuse_percent: str
    mounted_on: str

@dataclass
class LargeFile:
    permissions: str
    owner: str
    group: str
    size: str
    date: str
    path: str

@dataclass
class DiskIO:
    device: str
    r_s: str = "0"
    w_s: str = "0"
    rkb_s: str = "0"
    wkb_s: str = "0"

    class Config:
        extra = "allow"  # Allow extra fields for flexibility with different iostat outputs

Se connecter et exécuter les commandes

async def ssh_execute(command: str) -> str:
    """Execute a command on the remote server via SSH and return the output."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        connection_params = {
            'hostname': SERVER_CONFIG['hostname'],
            'port': SERVER_CONFIG['port'],
            'username': SERVER_CONFIG['username'],
        }

        if private_key_str:
            print(f"Loading private key from file: {private_key_str}")
            try:
                if passphrase:
                    private_key = key_class.from_private_key_file(private_key_str, password=passphrase)
                else:
                    private_key = key_class.from_private_key_file(private_key_str)
                connection_params['pkey'] = private_key
            except Exception as key_error:
                    print(f"Failed to load private key from file: {key_error}")
        else:
            raise ValueError("Private key file not found")

        client.connect(**connection_params)
        stdin, stdout, stderr = client.exec_command(command)
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        if error:
            print(f"Error executing {command}: {error}")
        return output
    except Exception as e:
        print(f"Error connecting to server: {e}")
        return f"Error: {e}"
    finally:
        client.close()

Construire l'API

Récupérer les informations sur l'espace disque

@app.get("/api/disk-space", response_model=List[DiskSpace])
async def get_disk_space():
    """Get disk space information using df -h."""
    output = await ssh_execute('df -h')
    lines = output.strip().split('\n')
    data = []
    for line in lines[1:]:
        parts = line.split()
        if len(parts) >= 5:
            disk_space = DiskSpace(
                    filesystem=parts[0],
                    size=parts[1],
                    used=parts[2],
                    available=parts[3],
                    use_percent=parts[4],
                    mounted_on=parts[5] if len(parts) > 5 else ''
            )
            data.append(disk_space)

    return data

Récupérer les informations sur les inodes

@app.get("/api/inodes", response_model=List[InodeUsage])
async def get_inodes_usage():
    """Get inode usage information using df -i."""
    output = await ssh_execute('df -i')
    lines = output.strip().split('\n')
    data = []
    for line in lines[1:]:
        parts = line.split()
        if len(parts) >= 5:
            inode_usage = InodeUsage(
                filesystem=parts[0],
                inodes=parts[1],
                iused=parts[2],
                ifree=parts[3],
                iuse_percent=parts[4],
                mounted_on=parts[5] if len(parts) > 5 else ''
            )
            data.append(inode_usage)

    return data

Récupérer les fichiers volumineux

@app.get("/api/large-files", response_model=List[LargeFile])
async def large_files():
    """Find large files over 50MB."""
    output = await ssh_execute('find / -type f -size +50M -exec ls -lh {} \\+ 2>/dev/null')
    lines = output.strip().split('\n')
    data = []
    for line in lines:
        if line:
            parts = line.split()
            if len(parts) >= 9:
                large_file = LargeFile(
                    permissions=parts[0],
                    owner=parts[2],
                    group=parts[3],
                    size=parts[4],
                    date=f"{parts[5]} {parts[6]} {parts[7]}",
                    path=' '.join(parts[8:])
                )
                data.append(large_file)

    return data

Récupérer les statistiques d'E/S sur disque

@app.get("/api/disk-io", response_model=List[DiskIO])
async def disk_io():
    """Get disk I/O statistics."""
    try:
        output = await ssh_execute('iostat -d')
        if not output or "command not found" in output:
            return []

        lines = output.strip().split('\n')
        headers = None
        data = []
        in_device_section = False

        for line in lines:
            line = line.strip()
            if not line:
                continue

            # Skip the first line with kernel info
            if "Linux" in line:
                continue

            # Skip the CPU stats section
            if "avg-cpu" in line:
                continue

            # Get the headers of interest
            if "Device" in line:
                headers = line.split()
                in_device_section = True
                continue

            # Read the device stats
            if in_device_section:
                values = line.split()
                if len(values) >= 3:  # At minimum, we need device name and some stats

                    # Create a data object with column headers as keys
                    device_data = {"device": values[0]}

                    for i in range(1, min(len(headers), len(values))):
                        header_key = headers[i].lower()
                        device_data[header_key] = values[i]

                    # Ensure we have read and write rates for Chart.js
                    device_data['r_s'] = device_data.get('kb_read/s', "0")
                    device_data['w_s'] = device_data.get('kb_wrtn/s', "0")

                    disk_io_instance = DiskIO(**device_data)
                    data.append(disk_io_instance)

        return data
    except Exception as e:
        print(f"Error in disk_io endpoint: {e}")
        return [DiskIO(device="error", r_s="0", w_s="0")]

Démarrage de l'API

Enfin, ce code vérifie si le script est exécuté directement (et non importé) et, si c'est le cas, lance un serveur web Uvicorn pour héberger l'application FastAPI sur toutes les interfaces réseau au port 8000 avec l'auto-rechargement activé pour le développement.

if __name__ == '__main__':
    import uvicorn
    uvicorn.run("dashboard.main:app", host="0.0.0.0", port=8000, reload=True)

Vous êtes à mi-chemin ! Maintenant, le Frontend !

Créer le Frontend

Maintenant, créons un modèle HTML simple avec Bootstrap et un peu de JavaScript pour récupérer et afficher les données.

C'est là que la magie opère : vos données prennent vie dans un tableau de bord propre et visuel.

Tableau de bord

<!-- templates/index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Linux Disk Space Dashboard</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-QWTKZyjpPEjISv5WaRU9OFeRpok6YctnYmDr5pNlyT2bRjXh0JMhjY6hW+ALEwIH" crossorigin="anonymous">
    <style>
        .card {
            margin-bottom: 20px;
        }
        .refresh-btn {
            margin-bottom: 20px;
        }
    </style>
</head>
<body>
    <div class="container mt-4">
        <h1 class="mb-4">Linux Disk Space Dashboard</h1>
        <p id="lastUpdated" class="text-muted mb-3">Last updated: -</p>
        <button id="refreshBtn" class="btn btn-primary refresh-btn">Refresh Data</button>

        <div class="row">
            <div class="col-md-4">
                <div class="card">
                    <div class="card-header">
                        <h5>Disk Space Usage</h5>
                    </div>
                    <div class="card-body">
                        <canvas id="diskSpaceChart"></canvas>
                    </div>
                </div>
            </div>

            <div class="col-md-4">
                <div class="card">
                    <div class="card-header">
                        <h5>Inode Usage</h5>
                    </div>
                    <div class="card-body">
                        <canvas id="inodeChart"></canvas>
                    </div>
                </div>
            </div>

            <div class="col-md-4">
                <div class="card">
                    <div class="card-header">
                        <h5>Disk I/O</h5>
                    </div>
                    <div class="card-body">
                        <canvas id="diskIOChart"></canvas>
                    </div>
                </div>
            </div>
        </div>

        <div class="row">
            <div class="card">
                <div class="card-header">
                    <h5>Large Files (>50MB)</h5>
                </div>
                <div class="card-body">
                    <div class="table-responsive">
                        <table class="table table-striped" id="largeFilesTable">
                            <thead>
                                <tr>
                                    <th>Path</th>
                                    <th>Size</th>
                                    <th>Owner</th>
                                    <th>Date</th>
                                </tr>
                            </thead>
                            <tbody>
                                <!-- Will be populated with JavaScript -->
                            </tbody>
                        </table>
                    </div>
                </div>
            </div>
        </div>
    </div>

    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-YvpcrYf0tY3lHB60NNkmXc5s9fDVZLESaAA55NDzOxhy9GkcIdslK1eN7N6jIeHz" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <script src="/static/js/dashboard.js"></script>
</body>
</html>

Nous allons utiliser Chart.js pour créer de jolis graphiques. Ce code JS récupérera les données de l'API et hydratera les graphiques dans les tableaux.

dashboard.js

initialiser les graphiques

//static/js/dashboard.js

// Chart objects
let diskSpaceChart, inodeChart, diskIOChart;

function initCharts() {
    // Disk Space Chart
    const diskSpaceCtx = document.getElementById('diskSpaceChart').getContext('2d');
    diskSpaceChart = new Chart(diskSpaceCtx, {
        type: 'bar',
        data: {
            labels: [],
            datasets: [{
                label: 'Used Space (%)',
                data: [],
                backgroundColor: 'rgba(54, 162, 235, 0.6)'
            }]
        },
        options: {
            scales: {
                y: {
                    beginAtZero: true,
                    max: 100
                }
            }
        }
    });

    // Inode Chart
    const inodeCtx = document.getElementById('inodeChart').getContext('2d');
    inodeChart = new Chart(inodeCtx, {
        type: 'bar',
        data: {
            labels: [],
            datasets: [{
                label: 'Used Inodes (%)',
                data: [],
                backgroundColor: 'rgba(255, 99, 132, 0.6)'
            }]
        },
        options: {
            scales: {
                y: {
                    beginAtZero: true,
                    max: 100
                }
            }
        }
    });

    // Disk I/O Chart
    const diskIOCtx = document.getElementById('diskIOChart').getContext('2d');
    diskIOChart = new Chart(diskIOCtx, {
        type: 'bar',
        data: {
            labels: [],
            datasets: [
                {
                    label: 'Read Rate (KB/s)',
                    data: [],
                    backgroundColor: 'rgba(54, 162, 235, 0.6)'
                },
                {
                    label: 'Write Rate (KB/s)',
                    data: [],
                    backgroundColor: 'rgba(255, 99, 132, 0.6)'
                }
            ]
        }
    });
}

Mise à jour de la date d'actualisation

function updateLastUpdatedTime(locale = navigator.language) {
    const now = new Date();
    const options = {
        year: 'numeric',
        month: 'short',
        day: 'numeric',
        hour: '2-digit',
        minute: '2-digit',
        second: '2-digit'
    };

    const formattedTime = now.toLocaleString(locale, options);
    document.getElementById('lastUpdated').textContent = `Last updated: ${formattedTime}`;
}

Appeler l'API et hydrater les tableaux

async function fetchData() {
    try {
        updateLastUpdatedTime();

        // Fetch disk space data
        const diskSpaceResponse = await fetch('/api/disk-space');
        const diskSpaceData = await diskSpaceResponse.json();

        // Update disk space chart
        diskSpaceChart.data.labels = diskSpaceData.map(item => item.mounted_on);
        diskSpaceChart.data.datasets[0].data = diskSpaceData.map(item =>
            parseFloat(item.use_percent.replace('%', '')));
        diskSpaceChart.update();

        // Fetch inode data
        const inodeResponse = await fetch('/api/inodes');
        const inodeData = await inodeResponse.json();

        // Update inode chart
        inodeChart.data.labels = inodeData.map(item => item.mounted_on);
        inodeChart.data.datasets[0].data = inodeData.map(item =>
            parseFloat(item.iuse_percent.replace('%', '')));
        inodeChart.update();

        // Fetch large files data
        const largeFilesResponse = await fetch('/api/large-files');
        const largeFilesData = await largeFilesResponse.json();

        // Update large files table
        const tableBody = document.getElementById('largeFilesTable').getElementsByTagName('tbody')[0];
        tableBody.innerHTML = '';

        largeFilesData.forEach(file => {
            const row = tableBody.insertRow();
            row.insertCell(0).textContent = file.path;
            row.insertCell(1).textContent = file.size;
            row.insertCell(2).textContent = file.owner;
            row.insertCell(3).textContent = file.date;
        });

        // Fetch disk I/O data
        try {
            const diskIOResponse = await fetch('/api/disk-io');

            if (!diskIOResponse.ok) {
                throw new Error(`Server returned ${diskIOResponse.status}: ${diskIOResponse.statusText}`);
            }

            const diskIOData = await diskIOResponse.json();

            if (diskIOData.length > 0) {
                document.getElementById('diskIOChart').style.display = 'block';
                const chartContainer = document.getElementById('diskIOChart').parentNode;
                const errorDiv = chartContainer.querySelector('.alert');
                if (errorDiv) {
                    chartContainer.removeChild(errorDiv);
                }

                const filteredDevices = diskIOData.filter(item => !item.device.startsWith('loop'));
                diskIOChart.data.labels = filteredDevices.map(item => item.device);

                const readRates = filteredDevices.map(item => parseFloat(item.r_s) || 0);
                const writeRates = filteredDevices.map(item => parseFloat(item.w_s) || 0);
                diskIOChart.data.datasets[0].data = readRates;
                diskIOChart.data.datasets[1].data = writeRates;

                diskIOChart.update();
            } else {
                const diskIOCtx = document.getElementById('diskIOChart');
                diskIOCtx.style.display = 'none';

                const chartContainer = diskIOCtx.parentNode;
                let errorDiv = chartContainer.querySelector('.alert');
                if (!errorDiv) {
                    errorDiv = document.createElement('div');
                    errorDiv.className = 'alert alert-warning mt-3';
                    chartContainer.appendChild(errorDiv);
                }
                errorDiv.textContent = 'No I/O statistics available.';
            }
        } catch (ioError) {
            console.error('Error fetching disk I/O data:', ioError);
            const diskIOCtx = document.getElementById('diskIOChart');
            diskIOCtx.style.display = 'none';

            const chartContainer = diskIOCtx.parentNode;
            let errorDiv = chartContainer.querySelector('.alert');
            if (!errorDiv) {
                errorDiv = document.createElement('div');
                errorDiv.className = 'alert alert-danger mt-3';
                chartContainer.appendChild(errorDiv);
            }
            errorDiv.textContent = `Error fetching I/O data: ${ioError.message}`;
        }

    } catch (error) {
        console.error('Error fetching data:', error);
    }
}

Démarrage et mise à jour du tableau de bord

document.addEventListener('DOMContentLoaded', () => {
    initCharts();
    updateLastUpdatedTime();
    fetchData();

    document.getElementById('refreshBtn').addEventListener('click', () => {
        fetchData();
    });

    // Auto-refresh every 300 seconds
    setInterval(fetchData, 60000);
});

Tester avec pytest

Il s'agit d'une suite de tests très basique pour commencer. Elle simule les réponses de l'API. Vous n'avez pas besoin de tester ici pour que cela fonctionne, mais c'est une très bonne pratique pour construire vos tests en cours de route. Le développement piloté par les tests (TDD pour les intimes) n'est pas obligatoire mais vivement encouragé, mais c'est un autre sujet/débat que je ne veux pas entamer ici. Le plus important est d'avoir une suite de tests de régression afin de pouvoir faire des changements sans craindre de casser les choses.

# test_app.py
from fastapi.testclient import TestClient
from unittest.mock import patch
from dashboard.main import app, DiskSpace, InodeUsage, LargeFile, DiskIO

client = TestClient(app)

def mock_ssh_response(command):
    """Returns mock responses for different commands."""
    if 'df -h' in command:
        return """Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        30G   15G   14G  52% /
/dev/sda2       434G  144G  268G  35% /home
tmpfs           7.8G     0  7.8G   0% /tmp"""
    elif 'df -i' in command:
        return """Filesystem       Inodes  IUsed   IFree IUse% Mounted on
/dev/sda1      3932160 532967 3399193   14% /
/dev/sda2     58007552 874332 57133220    2% /home"""
    elif 'du -h' in command:
        return """12M   /var/backups
24K   /var/tmp
1.2G  /var/log
4.0K  /var/mail
132M  /var/cache
1.3G  /var"""
    elif 'find' in command:
        return """-rw-r--r-- 1 user user 500M Apr 10 12:34 /home/user/database.dump
-rw-r--r-- 1 user user 350M Apr  9 09:12 /home/user/backup.tar.gz"""
    elif 'iostat' in command:
        return """Device            r/s     w/s     rkB/s     wkB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
sda              0.42    2.50      5.60     42.50     0.00     1.00   0.00  28.57    1.50    2.50   0.01    13.33    17.00   0.50   0.15"""

@patch('dashboard.main.ssh_execute')
def test_disk_space_endpoint(mock_ssh):
    # Set up the mock to return our test data
    mock_ssh.return_value = mock_ssh_response('df -h')

    # Make the request
    response = client.get('/api/disk-space')

    # Check response
    assert response.status_code == 200
    data = response.json()

    disk_spaces = [DiskSpace(**item) for item in data]

    # Verify model fields
    assert disk_spaces[0].filesystem == '/dev/sda1'
    assert disk_spaces[0].size == '30G'
    assert disk_spaces[0].used == '15G'
    assert disk_spaces[0].available == '14G'
    assert disk_spaces[0].use_percent == '52%'
    assert disk_spaces[0].mounted_on == '/'


@patch('dashboard.main.ssh_execute')
def test_inodes_endpoint(mock_ssh):
    mock_ssh.return_value = mock_ssh_response('df -i')
    response = client.get('/api/inodes')
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 2
    inode_usages = [InodeUsage(**item) for item in data]
    assert inode_usages[0].filesystem == '/dev/sda1'
    assert inode_usages[0].inodes == '3932160'
    assert inode_usages[0].iused == '532967'
    assert inode_usages[0].ifree == '3399193'
    assert inode_usages[0].iuse_percent == '14%'
    assert inode_usages[0].mounted_on == '/'

@patch('dashboard.main.ssh_execute')
def test_large_files_endpoint(mock_ssh):
    mock_ssh.return_value = mock_ssh_response('find')
    response = client.get('/api/large-files')
    assert response.status_code == 200
    data = response.json()
    large_files = [LargeFile(**item) for item in data]
    assert large_files[0].permissions == '-rw-r--r--'
    assert large_files[0].owner == 'user'
    assert large_files[0].group == 'user'
    assert large_files[0].size == '500M'
    assert large_files[0].date == 'Apr 10 12:34'
    assert '/database.dump' in large_files[0].path

@patch('dashboard.main.ssh_execute')
def test_disk_io_endpoint(mock_ssh):
    mock_ssh.return_value = mock_ssh_response('iostat')
    response = client.get('/api/disk-io')
    assert response.status_code == 200
    data = response.json()
    disk_io_stats = [DiskIO(**item) for item in data]

    assert disk_io_stats[0].device == 'sda'
    assert hasattr(disk_io_stats[0], 'r_s')
    assert hasattr(disk_io_stats[0], 'w_s')

    # The model allows extra fields due to Config(extra="allow")
    # But we can still test that certain fields are present and valid
    assert float(disk_io_stats[0].r_s) >= 0
    assert float(disk_io_stats[0].w_s) >= 0

Exécuter l'application

Différentes façons d'exécuter l'application, soit avec fastapi, soit avec uvicorn, soit avec des commandes python. J'utilise la commande fastapi pour une meilleure apparence:

# Exécuter le serveur web avec la ligne de commande fastapi :
cd dashboard
fastapi dev

Pour exécuter les tests

pytest

Accédez au tableau de bord à http://localhost:8000 dans votre navigateur web.

Vous pouvez également accéder à la documentation API générée automatiquement à l'adresse suivante :

  • http://localhost:8000/docs - Swagger UI documentation
  • http://localhost:8000/redoc - ReDoc documentation

C'est l'un des grands avantages de FastAPI - il génère automatiquement une documentation interactive.

Félicitations ! Vous venez de construire un tableau de bord de surveillance de l'espace disque qui pourrait sauver votre système d'un effondrement inopportun.

Avant de rêver plus grand, parlons de sécurité pour une utilisation en production.

Considérations de sécurité

Ce tableau de bord est conçu pour être simple, ce qui le rend plus adapté à une utilisation locale et idéal pour l'apprentissage, mais son déploiement en production nécessite des améliorations significatives en matière de sécurité.

Pour une configuration de production robuste, associez-le à nginx en tant que proxy inverse, mettez en œuvre l'authentification des utilisateurs et le contrôle d'accès basé sur les rôles en utilisant les dépendances de sécurité de FastAPI. Appliquez l'authentification basée sur des clés SSH avec des comptes de surveillance dédiés et à autorisation limitée ainsi que des politiques de rotation des clés.

Renforcez la sécurité de l'API et du client en ajoutant une limitation du débit, une validation des requêtes pour prévenir les attaques par injection. Utilisez le protocole HTTPS avec des certificats appropriés. Implémentez une gestion des erreurs en incluants tous les cas possibles.

Pour la gestion des informations d'identification, évitez de coder les informations d'identification en dur et optez plutôt pour des variables d'environnement ou un stockage sécurisé comme les solutions de gestion des secrets.

Si vous surveillez plusieurs serveurs, envisagez une architecture de production avec un agent dédié qui envoie les mesures à un serveur central, une file d'attente de messages pour le traitement asynchrone et une redondance pour garantir la fiabilité.

Méthodologie de développement

Ce projet a utilisé une approche pratique pour utiliser l'assistance de l'IA dans le développement tout en maintenant de bonnes pratiques.

Développement assisté par l'IA

IA: Un assistant, pas un Héros

J'ai utilisé Claude.ai pour générer l'échafaudage initial de ce projet, ce qui a apporté plusieurs avantages :

  • Prototypage rapide de la structure de base
  • Suggestions pour les choix de bibliothèques et les approches d'implémentation
  • Génération rapide de code de base

Cependant, le résultat était "médiocre" pour être clair :

  • Le code généré par l'IA semblait cohérent mais manquait de cohésion entre les composants
  • Certains tests générés ne vérifiaient pas correctement l'implémentation réelle
  • La structure du projet avait besoin d'être réalignée pour suivre les meilleures pratiques
  • Cela dit, Claude a été utile pour les parties JS, un peu trop verbeuses mais facilement corrigées.

Leçons apprises

Une application aussi simple que celle-ci nécessite déjà une grande attention aux détails. Si vous ne connaissez pas ces détails, vous ne pouvez pas suivre aveuglément une IA pour faire fonctionner votre application en production.

De nombreux exemples de "Vibe coding" (je n'aime pas ce terme) apparaissent et ils sont écrasés dès qu'ils entrent en production, apprenant à leurs dépens "the hard way".

Cette approche m'a appris plusieurs leçons importantes sur le développement assisté par l'IA :

  • Les outils d'IA sont bons pour générer des modèles standard (parfois en utilisant de vieux paradigmes)
  • L'expertise du développeur reste essentielle pour la conception et l'intégration du système (ce qui est rassurant)
  • L'évaluation critique du code généré est indispensable
  • L'IA n'a de valeur que lorsqu'elle est associée à de solides principes de génie logiciel

L'IA ne va pas éradiquer les développeurs. Nous avons besoin de développeurs plus instruits, de mathématiciens et d'experts en affaires. L'IA est un outil qui va éliminer les choses ennuyeuses.

Conclusion

En combinant la puissance des commandes Linux avec une interface web, vous pouvez créer une solution de surveillance puissante adaptée à vos besoins.

Cependant, il s'agit d'une approche relativement naïve pour obtenir les données, qui n'est pas vraiment efficace ni sécurisée puisque nous avons des connexions ssh constantes.

Votre devoir

Vous pouvez essayer de remanier le code et d'étendre la solution.

  1. Créez un gestionnaire SSH qui exécuterait toutes les commandes en un seul appel, par exemple.
  2. Ajoutez plus d'un serveur
  3. Comparez avec les produits commerciaux existants pour voir comment ils ont conçu leurs solutions.

Key Takeaways

  • FastAPI fournit un cadre puissant et moderne pour construire rapidement des API
  • Paramiko permet l'automatisation sécurisée de SSH à partir de Python
  • Chart.js offre des options flexibles de visualisation de données
  • Une solution de surveillance n'a pas besoin d'être complexe pour apporter de la valeur

L'objectif de cet exercice d'apprentissage était de montrer à quel point il est simple de construire une solution de base basée sur un article précédent que j'ai écrit sur les commandes linux.

Si vous considérez de construire un tel système pour mettre en production, cela peut devenir très complexe, alors accrochez-vous et n'abandonnez jamais. Continuez à apprendre !