"Si un ouvrier veut bien faire son travail, il doit d'abord affûter ses outils." - Confucius, "Les Entretiens de Confucius. Lu Linggong"
Page de garde > La programmation > Master Python Coroutines: Créez un outil asynchrone personnalisé pour de puissantes applications simultanées

Master Python Coroutines: Créez un outil asynchrone personnalisé pour de puissantes applications simultanées

Publié le 2025-04-29
Parcourir:800

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Les coroutines en python sont un outil puissant pour écrire du code asynchrone. Ils ont révolutionné la façon dont nous gérons les opérations simultanées, ce qui facilite la création d'applications évolutives et efficaces. J'ai passé beaucoup de temps à travailler avec les coroutines, et je suis ravi de partager quelques informations sur la création de primitives asynchrones personnalisées.

Commençons par les bases. Les coroutines sont des fonctions spéciales qui peuvent être interrompues et reprendre, permettant un multitâche coopératif. Ils sont le fondement de la syntaxe asynchrone / attente de Python. Lorsque vous définissez une coroutine, vous créez essentiellement une fonction qui peut rendre le contrôle à la boucle d'événement, permettant à d'autres tâches de s'exécuter.

Pour créer un objet attendable personnalisé, vous devez implémenter la méthode Await . Cette méthode doit renvoyer un itérateur. Voici un exemple simple:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

Cette classe CustomAwaitable peut être utilisée avec le mot-clé Await, tout comme les attentes intégrées. Lorsqu'il attend, il donne un contrôle une fois, puis renvoie sa valeur.

Mais que se passe-t-il si nous voulons créer des primitives asynchrones plus complexes? Examinons la mise en œuvre d'un sémaphore personnalisé. Les sémaphores sont utilisés pour contrôler l'accès à une ressource partagée par plusieurs coroutines:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 



Cette classe CustomSemaphore implémente les méthodes d'acquisition et de libération, ainsi que le protocole Async Context Manager ( aenter et aexit ). Il permet à un maximum de deux coroutines d'acquérir le sémaphore simultanément.

Maintenant, parlons de la création de boucles d'événements efficaces. Bien qu'Asyncio de Python fournit une implémentation de boucle d'événements robuste, il peut y avoir des cas où vous avez besoin d'un personnal. Voici un exemple de base d'une boucle d'événement personnalisée:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

Cette boucle d'événement personnalisée implémente les fonctionnalités de base comme l'exécution des tâches, la gestion des coroutines et même une fonction de sommeil simple. Ce n'est pas aussi riche en fonctionnalités que la boucle d'événement intégrée de Python, mais il démontre les concepts de base.

L'un des défis dans l'écriture du code asynchrone est de gérer les priorités des tâches. Alors que Asyncio de Python ne fournit pas de files d'attente de priorité intégrées pour les tâches, nous pouvons implémenter le nôtre:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock   delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

Cette priorityEventloop utilise une file d'attente de tas pour gérer les tâches en fonction de leur temps d'exécution planifié. Vous pouvez attribuer des priorités en planifiant des tâches avec différents retards.

Gestion de l'annulation gracieusement est un autre aspect important du travail avec les coroutines. Voici un exemple de la façon d'implémenter des tâches annuables:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

Dans cet exemple, le CANCELLABLABLE_OPERATION ACTIRE L'ANCELLEDEDERROR, effectue tout nettoyage nécessaire, puis remonte l'exception. Cela permet une manipulation gracieuse de l'annulation tout en propageant le statut d'annulation.

Explorons la mise en œuvre des itérateurs asynchronisés personnalisés. Ceux-ci sont utiles pour créer des séquences qui peuvent être itérées sur asynchrone:

class AsyncRange:
    def __init__(self, start, stop, step=1):
        self.start = start
        self.stop = stop
        self.step = step

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.start >= self.stop:
            raise StopAsyncIteration
        value = self.start
        self.start  = self.step
        await asyncio.sleep(0.1)  # Simulate some async work
        return value

async def main():
    async for i in AsyncRange(0, 5):
        print(i)

asyncio.run(main())

Cette classe Asyncrange implémente le protocole d'itulator asynchronisé, ce qui lui permet d'être utilisé en asynchronisation pour les boucles.

Enfin, examinons la mise en œuvre des gestionnaires de contexte asynchrones personnalisés. Ceux-ci sont utiles pour gérer les ressources qui doivent être acquises et libérées de manière asynchrone:

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async acquisition
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async release

async def main():
    async with AsyncResource() as resource:
        print("Using resource")
        await asyncio.sleep(1)

asyncio.run(main())

Cette classe AsynCresource implémente les méthodes aenter et aexit , ce qui lui permet d'être utilisé avec l'async avec instruction.

En conclusion, le système Coroutine de Python fournit une base puissante pour construire des primitives asynchrones personnalisées. En comprenant les mécanismes et protocoles sous-jacents, vous pouvez créer des solutions sur mesure pour des défis asynchrones spécifiques, optimiser les performances dans des scénarios simultanés complexes et étendre les capacités asynchrones de Python. N'oubliez pas que si ces implémentations personnalisées sont idéales pour l'apprentissage et les cas d'utilisation spécifiques, la bibliothèque Asyncio intégrée de Python est très optimisée et devrait être votre choix pour la plupart des scénarios. Codage heureux!


Nos créations

Assurez-vous de consulter nos créations:

investisseur central | Smart Living | époques et échos | Mystères perplexes | hindutva | elite dev | js écoles


Nous sommes sur le milieu

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Mystères déroutants Medium | Science et époques Medium | Hindutva moderne

Déclaration de sortie Cet article est réimprimé à: https://dev.to/aaravjoshi/master-python-coroutines-create-constom-async-tools-for-powerful-cconcurrent-pps-1dpc?1 s'il y a une violation, veuillez contacter [email protected] pour le supprimer.
Dernier tutoriel Plus>

Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.

Copyright© 2022 湘ICP备2022001581号-3