Chiffre.io anonymous visit counting for clients without JavaScript
Skip to content
import functools
import itertools
from pathlib import Path
from typing import Iterable

import pandas as pd

🐍 Pipelines de données fonctionnels avec Python#

> Implémenter des pipelines de traitement de données grâce aux concepts de programmation fonctionnelle inclus nativement avec Python

Romain Clement - Meetup Python Grenoble - 23/11/2023

🤷‍♂️ Contexte#

  • Essor du Data Engineering
  • Paradigme de graphes orientés acycliques (DAGs)
  • Programmation fonctionnelle en Python
  • Modularité, déterminisme, testabilité

Pourquoi parler de pipelines de données fonctionnels en Python ?

⚠️ Remarques#

  • Proposition de patterns
  • Small / medium data
  • Programmation fonctionnelle light

Quelques remarques avant de commencer :

Ce que je vais vous monter est simplement une proposition de patterns que je trouve intéressants et applicables dans divers contextes.

Ces patterns étant sur le calcul en mémoire, il s'appliquent principalement sur du volume de données petit ou moyen, mais pas au big data (bien que certains concepts se recoupent).

Enfin, nous allons parler de programmation fonctionnelle mais à un niveau relativement haut, donc pas d'inquiétude si vous n'êtes pas expert !

⚙️ DAG ?#

Directed Acyclic Graph

  • Graphe : noeuds + arêtes
  • Orienté : arêtes arcs
  • Acyclique : pas de circuits

Qu'est-ce qu'un DAG ou graphe orienté acyclique ?

⚙️ DAG - Représentation#

graph LR;
    A-->B;
    A-->C;
    B-->D;
    C-->D;

Voici un exemple de représentation de DAG. On remarque les propriétés énoncées précédemment :

  • Des tâches à effectuer représentées par des noeuds
  • Les dépendances d'exécution entre les tâches sont matérialisées par des arcs
  • Il n'y a pas de dépendances cycliques entre les tâches

⚙️ DAG - Avantages#

  • Dépendances et ordre d'exécution
  • Modularité, réutilisabilité, testabilité
  • Pipelines et algorithmes

Alors pourquoi s'embêter à exprimer un problème en le représentant sous la forme de graphe orienté acyclique ?

⚙️ DAG - Exemple 1#

graph LR;
    A[Compute some stuff];

Le DAG le plus simple que l'on puisse représenter !

Une seule tâche à effectuer, sans dépendances. C'est un cas particulier mais il est quand même bon de le noter.

⚙️ DAG - Exemple 2 - Nettoyer un fichier CSV#

graph LR;
    A[Load CSV] -- Dataframe --> B[Clean Dataframe];
    B -- Dataframe --> C[Save to CSV];

Un second exemple un peu plus représentatif : - Une première tâche permet de charger un fichier CSV dans un Dataframe (Pandas) - Puis une seconde tâche nettoie ce Dataframe (ex : formatage des dates) - Enfin une troisième tâche sauvegarde ce Dataframe dans un nouveau fichier CSV

Pour les personnes dans le domaine de la Data, c'est le schéma typique de ce que l'on appelle un processus ETL (Extract Transform and Load).

⚙️ DAG - Exemple 2bis - Nettoyer un fichier CSV#

graph LR;
    subgraph F[Process]
        direction LR
            A[Load CSV] -- Dataframe --> B[Clean Dataframe];
            B -- Dataframe --> C[Save to CSV];
    end
    START[ ] -- Path --> A;
    C -- Path --> END[ ];
    style START fill:#FFFFFF00, stroke:#FFFFFF00;
    style END fill:#FFFFFF00, stroke:#FFFFFF00;

Même exemple que précédemment mais avec une petite subtilité : le graphe devient paramétrable et devient par la même occasion intégrable comme une sous-tâche d'un plus grand système !

Cet exemple vous montre que la représentation en graphe peut s'appliquer à différents niveaux d'une architecture logicielle : - Algorithmique - Applicatif - Système - etc.

⚙️ DAG - Exemple 3 - Web-scraping ETL#

graph LR;
    A[Extract data] -- Dataframe --> B[Transform data];
    A -- Dataframe --> C[Compute metadata];
    B -- Dataframe --> D[Load data];
    C -- Dict[str, Any] --> D;

Dans cet exemple, on modélisation un processus de web-scraping.

Assez semblable au précédent, on remarque des chemins parallèles cette fois-ci : la sortie d'une tâche peut être utilisée par plusieurs tâches. Gardez bien ce concept en tête pour la suite !

⚙️ DAG - Exemple 4 - Traitement de fichiers (streaming)#

graph LR;
    subgraph B[Per file process]
        direction LR
            B1[Read file] --> B2[Process file]
    end
    A[List files] -- files --> B1

Autre exemple un peu plus complexe : certains traitements nécessite un flux en streaming, c'est à dire que l'on traite des données au fur et à mesure, au lieu de charger toutes les données en mémoire puis de tout traiter d'un bloc.

C'est généralement utile lorsque les données sont volumineuses ou bien qu'une source de données est IO bound.

Avec ce type de modélisation en streaming, notre pipeline devient potentiellement compatible avec la mise en concurrence de tâches (ex : asyncio ou Python Threads).

⚙️ DAG - Exemple 5 - Machine Learning#

graph LR;
    A[Load dataset] --> B[Train / test split];
    B -- train set --> C[Train model];
    C -- model --> D[Evaluate model] & E[Register];
    B -- test set --> D;
    D -- metrics --> F[Log];

Terminons avec un dernier exemple avec lequel les Data Scientists seront déjà familiers : un entrainement de modèle par apprentissage automatique (machine learning).

On retrouve les structures énoncées précédemment dans un exemple complet.

ƛ Programmation Fonctionnelle#

Concepts utiles :

  • Tout est fonction
  • Immutabilité
  • Composition
  • Réutilisabilité (Curryfication)
  • Evaluation paresseuse

Passons rapidement en revue quelques concepts utiles de programmation fonctionnelle qui nous serons utiles pour la suite. Bien évidemment, le monde de la PF est bien plus important.

ƛ Concepts fonctionnels en Python#

Disponible nativement :

  • Fonctions: def
  • Fonctions d'ordre supérieur: map(), filter(), itertools.reduce(), lambda
  • Réutilisabilité: functools.partial()
  • Evaluation paresseuse: yield, itertools.tee()
  • Typage (faible): typing

Le langage Python est multi-paradigme : impérative, orienté-objet mais aussi fonctionnel !

D'ailleurs, il suffit de voir ce que le langage et la bibliothèque standard inclus nativement.

Egalement, contrairement aux croyances, le langage est typé ! Le typage dynamique bien connu est un effet de bord des références nommées mais derrière le rideau, les objets sont bel et bien fortement typés !

Remarque pour le typage faible : les annotation de type en Python ne sont pas évaluées à l'exécution ! La vérification de types doit s'effecteur en amont grâce à un validateur tel que mypy. Le typage reste faible car non appliqué à l'exécution mais

ƛ Concepts fonctionnels en Python#

Non disponible nativement :

  • Composition de fonctions
  • Structures de données complexes immutables
  • Typage fort (~)

Néanmoins, certains concepts qui pourraient nous être utiles ne sont pas inclus nativement et c'est bien dommage !

Immutabilité : couplé avec des annotations de type (Sequence, Mapping, frozenset, frozen dataclasses) on peut s'en rapprocher au moment de la validation !

👨‍💻 Mise en pratique#

Essayons de fusionner les concepts de DAGs et programmation fonctionnelle !

Examples and Code Walkthrough (5 minutes)

Provide practical examples of writing DAGs using functional programming concepts.
Walk through code snippets to illustrate the application of map, filter, reduce, functools.partial, generators, iterators, and lazy evaluation in building data pipelines.

Advantages and Drawbacks (2 minutes)

Summarize the advantages of using functional data pipelines in Python:
    Readability and maintainability of code.
    Improved composability and reusability of functions.
    Efficient handling of large datasets through lazy evaluation.
Discuss potential drawbacks or challenges:
    Learning curve for those unfamiliar with functional programming.
    Potential performance trade-offs in certain scenarios.

Conclusion (2 minutes)

Recap the key points discussed in the talk.
Emphasize the power and flexibility of functional programming in designing data pipelines.
Encourage attendees to explore and experiment with functional programming concepts in their own data processing workflows.

♻ Réutilisabilité#

  • Fonctions paramétrables
  • Curryfication
  • functools.partial

Commençons avec la propriété de réutilisabilité car il sera réutilisée dans tous les autres exemples.

Le besoin général est de pouvoir définir des fonctions paramétrables utilisables dans plusieurs contextes : un bloc de traitement d'un graphe pourrait servir dans un autre graphe.

La PF permet ce genre de chose avec le concept de Curryfication (Currying) : définir une fonction pré-paramétrée pour quelle soit utilisée par la suite.

En Python, functools.partial permet ce type de construction. Voyons comment s'en servir avec un exemple.

def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
    return pd.read_csv(filename, sep=separator)
def simple_process_csv() -> None:
    load_csv("data.csv")
load_tsv = functools.partial(load_csv, separator="\t")
def simple_process_tsv() -> None:
    load_tsv("data.tsv")

⛓ Composabilité#

  • Chaînage de fonctions
  • functools.partial
  • functools.reduce

Peut être l'aspect le plus connu de la programmation fonctionnelle : pouvoir chainer les fonctions les unes aux autres !

En Python, la composabilité n'est pas supportée nativement mais voyons comment cela se présente :

def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
    return pd.read_csv(filename)

def clean_csv(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna()

def save_csv(filename: str, data: pd.DataFrame) -> None:
    data.to_csv(filename)
def pipeline_imperative(input_csv: Path, output_csv: Path) -> None:
    input_data = load_csv(input_csv)
    clean_data = clean_csv(input_data)
    save_csv(output_csv, clean_data)
def pipeline_functional(input_csv: Path, output_csv: Path) -> None:
    save_csv(output_csv, clean_csv(load_csv(input_csv)))
def compose(*functions):
    return functools.reduce(
        lambda f, g: lambda x: f(g(x)),
        functions,
        lambda x: x,
    )

def pipeline_compose(input_csv: Path, output_csv: Path) -> None:
    dag = compose(
        functools.partial(save_csv, output_csv),
        clean_csv,
        load_csv,
    )
    dag(input_csv)

💤 Evaluation paresseuse#

  • Streaming
  • Générateurs
  • map
  • itertools.tee
  • list

Dernier concept permettant de faire le lien entre tous et surement le plus puissant pour les graphes : l'évaluation paresseuse (lazy evaluation) ! Mais ce n'est pas sans problème ...

Evaluer une entité uniquement lorsque l'on en a besoin permet de mettre en place un flux de données en streaming. En Python, cela passe par l'utilisation des générateurs.

Le problème avec l'évaluation paresseuse est que l'on ne peut plus forcément chainer les fonctions entre elles telles quelles. Python fourni la fonction map qui permet d'appliquer une fonction à un itérable (un générateur est un itérable, mais l'inverse n'est pas forcément vrai).

Autre problème : comment utiliser un itérable par deux ou plusieurs tâches suivantes ? La fonction itertools.tee permet de dupliquer l'itérateur autant de fois que nécessaire.

Enfin, le problème de l'évaluation paresseuse est qu'il faut un déclencheur de sa matérialisation : généralement en Python la construction d'une liste final permet de déclencher la chaine d'évaluation.

def list_files() -> Iterable[Path]:
    return Path().glob("*.png")

def open_file(filepath: Path) -> bytes:
    return filepath.read_bytes()

def process_data(data: bytes) -> int:
    return len(data)
def streaming_imperative() -> None:
    files = list_files()
    files_bytes = map(open_file, files)
    files_len = map(process_data, files_bytes)
    print(list(files_len))
def streaming_functional() -> None:
    files_len = map(process_data, map(open_file, list_files()))
    print(list(files_len))
def streaming_multiple() -> None:
    files1, files2 = itertools.tee(list_files(), 2)
    list(map(process_data, map(open_file, files1)))
    list(map(print, files2))

👍 Avantages#

  • Fonctions Python pures
  • Graphes de traitement avec style fonctionnel
  • Force une conception générique
  • Unités de traitement paramétrables et réutilisables
  • Données volumineuses bénéficient du streaming avec les générateurs
  • Traitement des générateurs par évaluation paresseuse
  • Test facilité
  • Traitement concurrent / parallèle possible (concurrent.futures)

👎 Limitations#

  • Courbe d'apprentissage
  • Balance de performance
  • Composition de fonctions
  • Matérialisation des générateurs
  • Résultats intermédiaires
  • Introspection

🚀 Pour aller plus loin#

Orchestrateurs: - Airflow - Dagster - Prefect - Spark

Bibliothèques: - toolz - functional-pipeline

🏁 Conclusion#

Expérimentez avec vos propres workflows !

Questions ?