🐍 Pipelines de donnĂ©es fonctionnels avec Python¶

Implémenter des pipelines de traitement de données grùce aux concepts de programmation fonctionnelle inclus nativement avec Python

Romain Clement - Meetup Python Grenoble - 23/11/2023

đŸ€·â€â™‚ïž Contexte¶

  • Essor du Data Engineering
  • Paradigme de graphes orientĂ©s acycliques (DAGs)
  • Programmation fonctionnelle en Python
  • ModularitĂ©, dĂ©terminisme, testabilitĂ©

Pourquoi parler de pipelines de données fonctionnels en Python ?

⚠ Remarques¶

  • Proposition de patterns
  • Small / medium data
  • Programmation fonctionnelle light

Quelques remarques avant de commencer :

Ce que je vais vous monter est simplement une proposition de patterns que je trouve intéressants et applicables dans divers contextes.

Ces patterns étant sur le calcul en mémoire, il s'appliquent principalement sur du volume de données petit ou moyen, mais pas au big data (bien que certains concepts se recoupent).

Enfin, nous allons parler de programmation fonctionnelle mais Ă  un niveau relativement haut, donc pas d'inquiĂ©tude si vous n'ĂȘtes pas expert !

⚙ DAG ?¶

Directed Acyclic Graph

  • Graphe : noeuds + arĂȘtes
  • OrientĂ© : ~arĂȘtes~ arcs
  • Acyclique : pas de circuits

Qu'est-ce qu'un DAG ou graphe orienté acyclique ?

⚙ DAG - ReprĂ©sentation¶

graph LR;
    A-->B;
    A-->C;
    B-->D;
    C-->D;

Voici un exemple de représentation de DAG. On remarque les propriétés énoncées précédemment :

  • Des tĂąches Ă  effectuer reprĂ©sentĂ©es par des noeuds
  • Les dĂ©pendances d'exĂ©cution entre les tĂąches sont matĂ©rialisĂ©es par des arcs
  • Il n'y a pas de dĂ©pendances cycliques entre les tĂąches

⚙ DAG - Avantages¶

  • DĂ©pendances et ordre d'exĂ©cution
  • ModularitĂ©, rĂ©utilisabilitĂ©, testabilitĂ©
  • Pipelines et algorithmes

Alors pourquoi s'embĂȘter Ă  exprimer un problĂšme en le reprĂ©sentant sous la forme de graphe orientĂ© acyclique ?

⚙ DAG - Exemple 1¶

graph LR;
    A[Compute some stuff];

Le DAG le plus simple que l'on puisse représenter !

Une seule tĂąche Ă  effectuer, sans dĂ©pendances. C'est un cas particulier mais il est quand mĂȘme bon de le noter.

⚙ DAG - Exemple 2 - Nettoyer un fichier CSV¶

graph LR;
    A[Load CSV] -- Dataframe --> B[Clean Dataframe];
    B -- Dataframe --> C[Save to CSV];

Un second exemple un peu plus représentatif :

  • Une premiĂšre tĂąche permet de charger un fichier CSV dans un Dataframe (Pandas)
  • Puis une seconde tĂąche nettoie ce Dataframe (ex : formatage des dates)
  • Enfin une troisiĂšme tĂąche sauvegarde ce Dataframe dans un nouveau fichier CSV

Pour les personnes dans le domaine de la Data, c'est le schéma typique de ce que l'on appelle un processus ETL (Extract Transform and Load).

⚙ DAG - Exemple 2bis - Nettoyer un fichier CSV¶

graph LR;
    subgraph F[Process]
        direction LR
            A[Load CSV] -- Dataframe --> B[Clean Dataframe];
            B -- Dataframe --> C[Save to CSV];
    end
    START[ ] -- Path --> A;
    C -- Path --> END[ ];
    style START fill:#FFFFFF00, stroke:#FFFFFF00;
    style END fill:#FFFFFF00, stroke:#FFFFFF00;

MĂȘme exemple que prĂ©cĂ©demment mais avec une petite subtilitĂ© : le graphe devient paramĂ©trable et devient par la mĂȘme occasion intĂ©grable comme une sous-tĂąche d'un plus grand systĂšme !

Cet exemple vous montre que la représentation en graphe peut s'appliquer à différents niveaux d'une architecture logicielle :

  • Algorithmique
  • Applicatif
  • SystĂšme
  • etc.

⚙ DAG - Exemple 3 - Web-scraping ETL¶

graph LR;
    A[Extract data] -- Dataframe --> B[Transform data];
    A -- Dataframe --> C[Compute metadata];
    B -- Dataframe --> D[Load data];
    C -- Dict[str, Any] --> D;

Dans cet exemple, on modélisation un processus de web-scraping.

Assez semblable au prĂ©cĂ©dent, on remarque des chemins parallĂšles cette fois-ci : la sortie d'une tĂąche peut ĂȘtre utilisĂ©e par plusieurs tĂąches. Gardez bien ce concept en tĂȘte pour la suite !

⚙ DAG - Exemple 4 - Traitement de fichiers (streaming)¶

graph LR;
    subgraph B[Per file process]
        direction LR
            B1[Read file] --> B2[Process file]
    end
    A[List files] -- files --> B1

Autre exemple un peu plus complexe : certains traitements nécessite un flux en streaming, c'est à dire que l'on traite des données au fur et à mesure, au lieu de charger toutes les données en mémoire puis de tout traiter d'un bloc.

C'est généralement utile lorsque les données sont volumineuses ou bien qu'une source de données est IO bound.

Avec ce type de modélisation en streaming, notre pipeline devient potentiellement compatible avec la mise en concurrence de tùches (ex : asyncio ou Python Threads).

⚙ DAG - Exemple 5 - Machine Learning¶

graph LR;
    A[Load dataset] --> B[Train / test split];
    B -- train set --> C[Train model];
    C -- model --> D[Evaluate model] & E[Register];
    B -- test set --> D;
    D -- metrics --> F[Log];

Terminons avec un dernier exemple avec lequel les Data Scientists seront déjà familiers : un entrainement de modÚle par apprentissage automatique (machine learning).

On retrouve les structures énoncées précédemment dans un exemple complet.

ƛ Programmation Fonctionnelle¶

Concepts utiles :

  • Tout est fonction
  • ImmutabilitĂ©
  • Composition
  • RĂ©utilisabilitĂ© (Curryfication)
  • Evaluation paresseuse

Passons rapidement en revue quelques concepts utiles de programmation fonctionnelle qui nous serons utiles pour la suite. Bien évidemment, le monde de la PF est bien plus important.

ƛ Concepts fonctionnels en Python¶

Disponible nativement :

  • Fonctions: def
  • Fonctions d'ordre supĂ©rieur: map(), filter(), itertools.reduce(), lambda
  • RĂ©utilisabilitĂ©: functools.partial()
  • Evaluation paresseuse: yield, itertools.tee()
  • Typage (faible): typing

Le langage Python est multi-paradigme : impérative, orienté-objet mais aussi fonctionnel !

D'ailleurs, il suffit de voir ce que le langage et la bibliothĂšque standard inclus nativement.

Egalement, contrairement aux croyances, le langage est typé ! Le typage dynamique bien connu est un effet de bord des références nommées mais derriÚre le rideau, les objets sont bel et bien fortement typés !

Remarque pour le typage faible : les annotation de type en Python ne sont pas évaluées à l'exécution ! La vérification de types doit s'effecteur en amont grùce à un validateur tel que mypy. Le typage reste faible car non appliqué à l'exécution mais

ƛ Concepts fonctionnels en Python¶

Non disponible nativement :

  • Composition de fonctions
  • Structures de donnĂ©es complexes immutables
  • Typage fort (~)

NĂ©anmoins, certains concepts qui pourraient nous ĂȘtre utiles ne sont pas inclus nativement et c'est bien dommage !

Immutabilité : couplé avec des annotations de type (Sequence, Mapping, frozenset, frozen dataclasses) on peut s'en rapprocher au moment de la validation !

đŸ‘šâ€đŸ’» Mise en pratique¶

Essayons de fusionner les concepts de DAGs et programmation fonctionnelle !

Examples and Code Walkthrough (5 minutes)

Provide practical examples of writing DAGs using functional programming concepts.
Walk through code snippets to illustrate the application of map, filter, reduce, functools.partial, generators, iterators, and lazy evaluation in building data pipelines.

Advantages and Drawbacks (2 minutes)

Summarize the advantages of using functional data pipelines in Python:
    Readability and maintainability of code.
    Improved composability and reusability of functions.
    Efficient handling of large datasets through lazy evaluation.
Discuss potential drawbacks or challenges:
    Learning curve for those unfamiliar with functional programming.
    Potential performance trade-offs in certain scenarios.

Conclusion (2 minutes)

Recap the key points discussed in the talk.
Emphasize the power and flexibility of functional programming in designing data pipelines.
Encourage attendees to explore and experiment with functional programming concepts in their own data processing workflows.

♻ RĂ©utilisabilité¶

  • Fonctions paramĂ©trables
  • Curryfication
  • functools.partial

Commençons avec la propriété de réutilisabilité car il sera réutilisée dans tous les autres exemples.

Le besoin général est de pouvoir définir des fonctions paramétrables utilisables dans plusieurs contextes : un bloc de traitement d'un graphe pourrait servir dans un autre graphe.

La PF permet ce genre de chose avec le concept de Curryfication (Currying) : définir une fonction pré-paramétrée pour quelle soit utilisée par la suite.

En Python, functools.partial permet ce type de construction. Voyons comment s'en servir avec un exemple.

In [2]:
def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
    return pd.read_csv(filename, sep=separator)
In [3]:
def simple_process_csv() -> None:
    load_csv("data.csv")
In [4]:
load_tsv = functools.partial(load_csv, separator="\t")
In [5]:
def simple_process_tsv() -> None:
    load_tsv("data.tsv")

⛓ Composabilité¶

  • ChaĂźnage de fonctions
  • functools.partial
  • functools.reduce

Peut ĂȘtre l'aspect le plus connu de la programmation fonctionnelle : pouvoir chainer les fonctions les unes aux autres !

En Python, la composabilité n'est pas supportée nativement mais voyons comment cela se présente :

In [6]:
def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
    return pd.read_csv(filename)

def clean_csv(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna()

def save_csv(filename: str, data: pd.DataFrame) -> None:
    data.to_csv(filename)
In [7]:
def pipeline_imperative(input_csv: Path, output_csv: Path) -> None:
    input_data = load_csv(input_csv)
    clean_data = clean_csv(input_data)
    save_csv(output_csv, clean_data)
In [8]:
def pipeline_functional(input_csv: Path, output_csv: Path) -> None:
    save_csv(output_csv, clean_csv(load_csv(input_csv)))
In [9]:
def compose(*functions):
    return functools.reduce(
        lambda f, g: lambda x: f(g(x)),
        functions,
        lambda x: x,
    )

def pipeline_compose(input_csv: Path, output_csv: Path) -> None:
    dag = compose(
        functools.partial(save_csv, output_csv),
        clean_csv,
        load_csv,
    )
    dag(input_csv)

đŸ’€ Evaluation paresseuse¶

  • Streaming
  • GĂ©nĂ©rateurs
  • map
  • itertools.tee
  • list

Dernier concept permettant de faire le lien entre tous et surement le plus puissant pour les graphes : l'évaluation paresseuse (lazy evaluation) ! Mais ce n'est pas sans problÚme ...

Evaluer une entité uniquement lorsque l'on en a besoin permet de mettre en place un flux de données en streaming. En Python, cela passe par l'utilisation des générateurs.

Le problÚme avec l'évaluation paresseuse est que l'on ne peut plus forcément chainer les fonctions entre elles telles quelles. Python fourni la fonction map qui permet d'appliquer une fonction à un itérable (un générateur est un itérable, mais l'inverse n'est pas forcément vrai).

Autre problÚme : comment utiliser un itérable par deux ou plusieurs tùches suivantes ? La fonction itertools.tee permet de dupliquer l'itérateur autant de fois que nécessaire.

Enfin, le problÚme de l'évaluation paresseuse est qu'il faut un déclencheur de sa matérialisation : généralement en Python la construction d'une liste final permet de déclencher la chaine d'évaluation.

In [10]:
def list_files() -> Iterable[Path]:
    return Path().glob("*.png")

def open_file(filepath: Path) -> bytes:
    return filepath.read_bytes()

def process_data(data: bytes) -> int:
    return len(data)
In [11]:
def streaming_imperative() -> None:
    files = list_files()
    files_bytes = map(open_file, files)
    files_len = map(process_data, files_bytes)
    print(list(files_len))
In [12]:
def streaming_functional() -> None:
    files_len = map(process_data, map(open_file, list_files()))
    print(list(files_len))
In [13]:
def streaming_multiple() -> None:
    files1, files2 = itertools.tee(list_files(), 2)
    list(map(process_data, map(open_file, files1)))
    list(map(print, files2))

👍 Avantages¶

  • Fonctions Python pures
  • Graphes de traitement avec style fonctionnel
  • Force une conception gĂ©nĂ©rique
  • UnitĂ©s de traitement paramĂ©trables et rĂ©utilisables
  • DonnĂ©es volumineuses bĂ©nĂ©ficient du streaming avec les gĂ©nĂ©rateurs
  • Traitement des gĂ©nĂ©rateurs par Ă©valuation paresseuse
  • Test facilitĂ©
  • Traitement concurrent / parallĂšle possible (concurrent.futures)

👎 Limitations¶

  • Courbe d'apprentissage
  • Balance de performance
  • Composition de fonctions
  • MatĂ©rialisation des gĂ©nĂ©rateurs
  • RĂ©sultats intermĂ©diaires
  • Introspection

🚀 Pour aller plus loin¶

Orchestrateurs:

  • Airflow
  • Dagster
  • Prefect
  • Spark

BibliothĂšques:

  • toolz
  • functional-pipeline

🏁 Conclusion¶

Expérimentez avec vos propres workflows !

Questions ?

📚 RĂ©fĂ©rences¶

  • Wikipedia - Directed Acyclic Graph
  • Function composition in Python
  • Mimicking Immutability in Python with Type Hints
  • itertools
  • functools