Olympipe Pipelines parallèles sans boilerplate
Chaînez des étapes multiprocessing en Python avec une API fluide. Contournez le GIL, saturez votre CPU ou votre GPU, sans gérer de Pool ni de Queue.
Installation
Démarrage rapide
from olympipe import Pipeline
results = (
Pipeline(range(1000))
.task(heavy_compute, count=8) # 8 workers parallèles
.filter(lambda x: x > 0)
.batch(32)
.wait_for_result()
)Pourquoi Olympipe ?
5 animations qui montrent, étape par étape, comment Olympipe transforme un pipeline séquentiel en machine parallèle hautement optimisée.
Traitement séquentiel — sans parallélisme
Sans olympipe, chaque image attend que la précédente soit entièrement terminée. Le GPU est idle pendant le chargement, le réseau bloque tout pendant l'upload. Le débit est limité par la somme de toutes les étapes.
# Séquentiel — une image à la fois
for image_path in images:
img = load(image_path)
img = preprocess(img)
img = augment(img)
pred = detect(img) # GPU idle le reste du temps
upload(pred) # bloque tout le pipeline4 workers — GPU sérialisé par CUDA
Sur GPU, les poids du modèle sont chargés une fois dans la shared memory (192 Ko sur A100), puis appliqués à tout le batch simultanément — passer de batch=1 à batch=4 est presque gratuit. Mais 4 processus séparés ne peuvent pas partager ce contexte CUDA : chaque processus attend son tour. Résultat : l'étape Detection dure 4× plus longtemps qu'en séquentiel, annulant tout le bénéfice du parallélisme.
# 4 inférences GPU séparées → CUDA sérialise
# batch=1 × 4 fois ≈ 4 × T_infer (lent)
# batch=4 × 1 fois ≈ 1.2 × T_infer (rapide !)
with Pool(4) as pool:
# Chaque worker lance sa propre inférence GPU :
# ils s'accumulent dans la file CUDA
pool.map(run_full_pipeline, images)multiprocessing.Pool — 4 workers complets
Même avec 4 workers, les coûts s'accumulent : RAM ×4 (chaque processus a son propre heap OS), CPU ×4, et l'étape GPU reste sérialisée (×4 plus lente). Le code de coordination est lourd, et ajouter une étape intermédiaire oblige à tout réécrire.
from multiprocessing import Pool
def full_pipeline(path):
img = load(path)
img = preprocess(img)
img = augment(img)
pred = detect(img) # ← sérialisé sur GPU, 4× plus lent
upload(pred)
with Pool(4) as pool:
pool.map(full_pipeline, images)Olympipe — pipeline parallelism
Chaque étape tourne dans son propre processus. Pendant que Load traite l'image n+1, Preprocess traite l'image n, Detection tourne en parallèle sur le GPU… Les étapes se chevauchent en permanence — le débit est limité par l'étape la plus lente, pas leur somme.
from olympipe import Pipeline Pipeline(images) .task(load) .task(preprocess) .task(augment) .task(detect) .task(upload) .wait_for_completion()
.batch(4) — grouper avant la détection GPU
Le GPU est plus efficace sur des batchs entiers que sur des images individuelles. Un simple .batch(4) avant l'inférence multiplie le débit de détection par 4 — en une seule ligne.
from olympipe import Pipeline Pipeline(images) .task(load) .task(preprocess) .task(augment) .batch(4) # ← groupe 4 images avant le GPU .task(detect_batch) # traite un batch entier à la fois .explode(lambda x: x) # redéploie les résultats .task(upload) .wait_for_completion()
.task(upload, count=3) — fan-out sur le goulot
L'upload réseau est lent et bloque tout le pipeline. Passer count=3 sur ce seul step crée 3 workers en parallèle uniquement là où c'est nécessaire — sans toucher au reste du pipeline.
from olympipe import Pipeline Pipeline(images) .task(load) .task(preprocess) .task(augment) .task(detect) .task(upload, count=3) # ← 3 workers en parallèle sur l'upload .wait_for_completion()
Performance locale
Chaque étape tourne dans son propre processus — pas de GIL, pas de coordination manuelle.
8x
speedup typique avec 8 workers
0
boilerplate multiprocessing
100x
plus rapide avec cache disque
Fonctionnalités
Tâches parallèles
Multipliez les workers d'un seul paramètre
Filtrage
Élaguez votre flux sans rompre le pipeline
Batching
Regroupez les items pour saturer votre GPU ou votre BDD
Split & Gather
Branchez votre pipeline en flux indépendants, fusionnez-les après
Workers avec état
Un modèle ML par worker, chargé une seule fois
Cache disque
Rejouer une étape coûteuse sans recalculer
Voir tous les exemples
Exemples prêts à copier-coller pour vos use cases concrets
