Files
training.python.datascience/documentation/03.2-celery-basics.md
2025-10-24 22:29:15 +02:00

7.6 KiB

title, author
title author
Celery Steve Kossouho

Gestion de tâches avec Celery


Qu'est-ce que Celery ?

Celery est une bibliothèque Python open source qui est utilisée pour l'exécution asynchrone de tâches. Elle permet de distribuer l'exécution des tâches sur plusieurs travailleurs (workers), que l'on peut répartir sur plusieurs machines ou sur un même serveur.

Grâce à sa capacité de parallélisation, Celery est un bon choix pour exécuter sans supervision directe des tâches longues et coûteuses telles que des calculs longs, typiquement sur de gros volumes de données.

Logo de Celery{width=160px}


Configuration de Celery

Pour commencer à utiliser Celery, vous devez tout d'abord installer le paquet via pip :

pip install celery

Ensuite, vous aurez besoin d'un [broker]{.naming} de messages, qui est utilisé par Celery pour transmettre des messages de façon bidirectionnelle entre votre application principale et les processus exécutant des tâches. Celery prend en charge plusieurs brokers de messages, mais nous utiliserons ici Redis pour sa simplicité d'utilisation.

pip install redis

Une fois Redis installé, vous pouvez configurer Celery pour l'utiliser comme broker. Pour cela, vous devez créer une instance de Celery dans votre application :

from celery import Celery

CELERY_APP_NAME = 'myapp'
CELERY_BROKER_URL = 'redis://localhost:6379/0'

app = Celery(CELERY_APP_NAME, broker=CELERY_BROKER_URL)

Démarrer des tâches

Pour démarrer avec Celery, vous devez d'abord définir des [tâches]{.naming}. Une tâche se déclare sous la forme d'une fonction qui sera exécutée de manière asynchrone. Par exemple, imaginons que vous avez une fonction qui effectue un calcul sur un DataFrame Pandas :

import pandas as pd

def calculate(df: pd.DataFrame):
    return df.sum()

Vous pouvez transformer cette fonction en une tâche Celery en utilisant le décorateur app.task :

import pandas as pd
from celery import Celery

app = Celery(..., broker=...)

@app.task
def calculate(df: pd.DataFrame):
    return df.sum()

Vous pouvez désormais exécuter cette fonction de manière asynchrone en utilisant la méthode task.delay(){.python} :

result = calculate.delay(df)

La méthode .delay() renvoie un objet AsyncResult que vous pouvez utiliser pour obtenir le résultat de la tâche une fois qu'elle est terminée :

print(result.get())

Serveur de Celery

Pour demarrer le serveur de Celery, qui exécutera les tâches, vous devez utiliser la commande celery worker dans votre terminal.

celery worker

Planifier des tâches

Celery fournit également un moyen de planifier l'exécution des tâches. Pour cela, vous aurez besoin de l'extension Celery Beat.

Avec Celery Beat, vous pouvez définir des intervalles de temps réguliers pour l'exécution des tâches. Par exemple, vous pouvez configurer une tâche pour qu'elle soit exécutée toutes les 10 minutes.

Vous avez le choix entre deux types d'intervalle :

  • crontab : Utiliser une expression de crontab pour définir l'intervalle de temps.
  • schedule : Utiliser un objet datetime.timedelta pour définir l'intervalle de temps.

Voici comment vous pouvez configurer Celery Beat pour exécuter une tâche toutes les 10 minutes :

from celery.schedules import crontab

app.conf.beat_schedule = {
    'run-every-10-minutes': {
        'task': 'myapp.calculate',
        'schedule': crontab(minute='*/10'),
    },
}

Dans cet exemple, myapp.calculate est le chemin d'import pleinement qualifié de la tâche que vous souhaitez exécuter. L'objet crontab est utilisé pour définir l'intervalle de temps pour l'exécution de la tâche.


Serveur de Celery Beat

Pour demarrer le serveur de Celery Beat, vous devez utiliser la commande celery beat dans votre terminal.

celery beat

Alternatives à Celery


RQ (Redis Queue)

RQ est une bibliothèque Python simple pour la mise en file d'attente des tâches. Elle utilise Redis comme système de file d'attente. RQ est particulièrement apprécié pour sa simplicité et sa clarté, il est donc plus facile à apprendre et à mettre en œuvre que Celery.


Dramatiq

Dramatiq est une bibliothèque Python de mise en file d'attente de tâches distribuées avec un accent sur la simplicité, la fiabilité et la performance. Comme Celery, Dramatiq peut utiliser plusieurs brokers de messages, dont RabbitMQ et Redis.


TaskTiger

TaskTiger est une autre bibliothèque Python pour la gestion des tâches. Elle utilise également Redis comme système de file d'attente. TaskTiger offre des fonctionnalités uniques telles que la possibilité de gérer les tâches en batch et une interface d'administration intégrée.


Apache Airflow

Airflow est une plateforme utilisée pour programmer et surveiller des flux de travail. Créée par Airbnb, elle est utilisée pour gérer les processus ETL complexes. Bien qu'elle ne soit pas une bibliothèque Python à proprement parler, elle est écrite en Python et est couramment utilisée dans les projets de science des données.


Ces bibliothèques offrent chacune une approche unique de la mise en file d'attente des tâches en Python et sont toutes de bonnes alternatives à Celery selon les besoins spécifiques de votre projet.


Utilisation de RQ (Redis Queue)


Installation

Pour commencer à utiliser RQ, vous devez l'installer en exécutant la commande suivante :

pip install rq

Configuration

RQ nécessite un broker de messages pour fonctionner, et il utilise Redis par défaut. Assurez-vous d'avoir installé et démarré un serveur Redis sur votre machine. Une fois le serveur Redis en place, vous pouvez créer une connexion à Redis dans votre script Python :

from redis import Redis
from rq import Queue

# Se connecter à Redis
redis_conn = Redis()

# Créer une file d'attente
q = Queue(connection=redis_conn)

Envoi de tâches

Une fois que vous avez configuré votre file d'attente, vous pouvez commencer à y ajouter des tâches. Voici un exemple de fonction que nous pourrions vouloir exécuter en arrière-plan :

def say_hello(name):
    print(f'Hello, {name}!')

Pour ajouter cette tâche à notre file d'attente, nous utilisons la méthode enqueue() de notre objet Queue :

q.enqueue(say_hello, 'Alice')

Cette tâche sera alors ajoutée à la file d'attente et exécutée par un "travailleur" RQ dès qu'il sera disponible.


Démarrage d'un Worker

Pour démarrer un travailleur RQ qui va consommer des tâches depuis la file d'attente, vous pouvez utiliser la commande rq worker dans votre terminal. Assurez-vous d'être dans le même environnement que celui où votre serveur Redis est en cours d'exécution :

rq worker

Le travailleur va démarrer et commencer à traiter les tâches de la file d'attente.


L'utilisation de RQ est un moyen simple et efficace de gérer les tâches en arrière-plan en Python. Avec sa simplicité et sa facilité d'utilisation, RQ est un excellent choix pour les applications Python qui nécessitent la mise en file d'attente de tâches.