Exécution de Céleri avec RabbitMQ

Il y a eu une explosion d’intérêt pour le traitement distribué.tâches plus longues sur de nombreux travailleurs différents. Les courtiers de messages tels que RabbitMQ assurent la communication entre les nœuds. L’exécution de vos clients Celeryclients, de vos travailleurs et de votre courtier associé dans le cloud donne à votre équipe le pouvoir de gérer et de faire évoluer facilement les processus backend, les tâches et les tâches administratives de base.

Qu’est-ce que le céleri ?

Celeryest une file d’attente de tâches distribuée qui simplifie la gestion de la distribution des tâches.Les développeurs divisent les ensembles de données en lots plus petits pour que le Céleri puisse les traiter dans un travail connu sous le nom de travail.

Vous déployez un ou plusieurs processus de travail qui se connectent à une file d’attente de messages (courtier basé sur anAMQP ou SQS). Les clients soumettent des tâches au courtier de messages et la file d’attente écoute les demandes de travail en utilisant Celery. Le courtier de messages distribue ensuite les demandes de travail aux travailleurs.Les travailleurs attendent les travaux de Céleri et exécutent les tâches.Une fois terminé, le travailleur envoie un résultat à une autre file d’attente pour le processus client.

Dans Celery, les clients et les travailleurs ne communiquent pas directement entre eux mais via des files d’attente de messages.Les clients envoient des messages à l’aide de tâches dans le système, tout comme un appel de procédure distant initie une fonction. Les clients n’ont pas besoin de comprendre le format actuel mais d’annoter les fonctions et de les enregistrer avec le framework:

@app.task def send_hello(x, y): return ‘Hello’ # overrides the broker from your configuration app = Celery('app_name', broker='...') app.tasks 

En utilisant la liaison Python officielle, nous avons créé une tâche et l’avons enregistrée avec le système. Vous pouvez utiliser la méthode apply_async(function, args, kwargs,...) pour envoyer la tâche à la file d’attente appropriée. Le client appelle get sur l’avenir renvoyé par apply_async pour bloquer et récupérer la réponse ou utilise la méthode d’état pour lancer un appel au courtier vérifiant l’état de la demande.

Un message acheminé vers une file d’attente attend dans la file d’attente jusqu’à ce que quelqu’un le consomme, et le message est supprimé de la file d’attente lorsqu’il a été accusé de réception.

Entraîne le Céleri

Il est possible de garder une trace des états d’une tâche. Le céleri peut aussienregistrer ou envoyer les états. Ainsi, au lieu d’utiliser la fonction get, il est possiblepour pousser les résultats vers un autre backend. Il existe plusieurs backends de résultats intégrés à choisir à partir d’includingSQLAlchemy, specificdatabasesandRPC (RabbitMQ).

Par défaut, le Céleri est configuré pour ne pas consommer les résultats des tâches. Définir l’option de configuration result_backend = 'rpc' indique au système d’envoyer une réponse à une file d’attente unique pour consommation. Il n’est pas conseillé d’utiliser le backend ampq. Cela pourrait entraîner une fuite de mémoire.

Vous trouverez plus d’informations sur les résultats ici.

Que sont les échanges dans le céleri?

Le céleri fonctionne à travers le routage de message et les modèles d’abonnement à la publication.Les éditeurs envoient des messages à un échange qui utilise une correspondance directe ou une carte de fil à une ou plusieurs files d’attente de travail dans le courtier. Le type d’échange définit commentles messages sont acheminés via l’échange.Vous configurez les types d’échange dans la configuration :

CELERY_QUEUES = { "my_queue”:{ "exchange”: "my_exchange”, "exchange_type”: "topic”, "binding_key”: "all_tasks.task_type” } } CELERY_DEFUALT_EXCHANGE=”my_exchange” CELERY_DEFAULT_EXCHANGE_TYPE=”direct” CELERY_DEFAULT_ROUTING_KEY=”all_tasks”

Dans cet exemple, vous déclarez une file d’attente par défaut comme la seule file d’attente connue avec l’échange par défaut, mais vous remplacez le type par topic avec les tâches clés de routage.type de tâche. L’échange de rubriques vous permet d’utiliser des correspondances de caractères génériques pour que les tâches soient envoyées à all_tasks.* sont toujours reçus. Cette configuration peut se produire dans le même module où vous démarrez votre application.

Types d’échanges

Les types d’échanges standard sont direct, topic, fanout et headers. Thesedictate comment les messages passent aux abonnés. Les clés de routage doivent correspondre exactement aux échanges directs ina. Les échanges de sortants envoient des messages à toutes les files d’attente attachées.Les sujets permettent la correspondance de caractères génériques. Les échanges d’en-têtes ne transmettent que les métadonnées.

En savoir plus sur les échanges et les clés de routage de Trabbitmq

Quand dois-je utiliser le Céleri ?

En tant que système de file d’attente de tâches, Celery fonctionne bien avec les processus de longue durée ou les petites tâches reproductibles travaillant sur des lots.Les types de problèmes que les poignées de Céleri sont des tâches asynchrones courantes. La mise à l’échelle d’images, l’encodage vidéo, l’ETL, l’envoi d’e-mails ou d’autres pipelines bénéficient de ce cadre pré-construit qui gère une grande partie du travail impliqué dans leur construction.

Quels sont les avantages de l’utilisation du céleri?

Celery fonctionne avec n’importe quelle langue via le protocole de message standardisé.Outre la version officielle de Python, d’autres API sont en développement pour e.g.Java , RustandNode.Il est possible de créer un système centralisé en utilisant n’importe quel langage avec l’API anAMQP ou SQS.

D’autres avantages incluent la réduction de la quantité de code requise pour distribuer les tâches et la possibilité de planifier des tâches périodiquement via celerybeat.Celerybeat est un planificateur qui lance des tâches à intervalles réguliers.Une surveillance autonome en temps réel pour les travailleurs du céleri est également disponible via celerymon.

Pourquoi devrais-je utiliser un courtier de messages cloud ?

En tant qu’intermédiaire pré-construit, Celery simplifie le développement et la gestion des pipelines.CloudAMQP élimine les besoins administratifs de votre backend avec des clusters prêts à l’emploi.

« Nous utilisons CloudAMQP-RabbitMQ en tant que courtier pour nos tâches asynchrones via Celery. Nous l’utilisons comme service asynchrone basé sur des événements. CloudAMQP prend en charge notre projet grâce à une surveillance adéquate, à une haute disponibilité et à des files d’attente scalables. »Shivam Arora, Delhivery

Vous n’avez plus à vous soucier de la mise à l’échelle de votre système pour répondre à la demande croissante.Nos instances sont livrées avec la console de gestion déjà en cours d’exécution.

Configuration de RabbitMQ avec CloudAMQP

CloudAMQP provisionne vos instances RabbitMQ immédiatement.Créez un compte et provisionnez une instance directement depuis l’interface de gestion web. La mise à l’échelle automatique intégrée permet à vos courtiers de travailler rapidement.Les bases CloudAMQP reposent entièrement sur la puissance de l’infrastructure provisionedinstance sous-jacente, et vous pouvez monter et descendre d’un plan à l’autre lorsqu’il est nécessaire. Notre niveau gratuit, little lemur, est une excellente option pour les tests.

Comment utiliser CloudAMQP comme courtier de messages Céleri ?

Une fois provisionné, votre broker CloudAMQP fonctionne de la même manière que votre système sur site. Tout ce dont vous avez besoin est une URL, un nom d’utilisateur et un mot de passe pour établirune connexion au courtier. Céleri gère le reste :

app = Celery('app_name', broker='amqps://user:password@host:port/host’)print(app.conf.broker_url)

Le framework Céleri stocke l’URL dans la configuration. Puisque le Céleri s’exécute séparément du courtier, vous obtenez le même degré de contrôle que si vous exécutiez l’ensemble de votre système sur site.

Puis-je faire fonctionner mes travailleurs de Céleri dans le cloud?

Bien que CloudAMQP fournisse un courtier de messages, il est également possible de déployer des workers sur AWS ou un autre service cloud. Les travailleurs ont seulement besoin de savoir où réside le courtier pour faire partie du système. Celery gère aqueue pour les événements et les notifications sans nœud de registre commun.

En raison de l’utilisation d’un courtier pour la gestion du système, vous pouvez exécuter vos tâches dans des conteneurs de stockage overKubernetes.Les conteneurs s’adaptent automatiquement à vos besoins, tandis que Kubernetes vous permet de définir des stratégies de mise à l’échelle et Flower fournit des fonctionnalités de surveillance.Un conteneur de céleri officiel peut être trouvé ici: https://hub.docker.com/_/celery.Celery est également entièrement pris en charge onHeroku.

L’exécution de vos clients et travailleurs céleri dans le cloud minimise le coût de développement et de déploiement des pipelines.

Pourquoi CloudAMQP sponsorise le projet Celery

Nous avons de nombreux clients exécutant des applications qui font confiance à Celery, et nous soutenons donc activement et mensuellement le développement du projet Celery (AMQP). Veuillez nous faire savoir s’il y a quelque chose dans le Célerythque vous souhaitez que nous mettions davantage l’accent sur.

Visitez notre page de planpour plus d’informations sur la mise en route du déploiement d’un cluster RabbitMQ.Nous vous conseillons également de lire nos paramètres recommandés pour le céleri sur CloudAMQP avant de commencer un pipeline de production, ainsi que d’apprécier le cadre de la gamme.

Vous aimez cet article ? N’oubliez pas de le partager avec les autres. 😉

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.