„Wenn ein Arbeiter seine Arbeit gut machen will, muss er zuerst seine Werkzeuge schärfen.“ – Konfuzius, „Die Gespräche des Konfuzius. Lu Linggong“
Titelseite > Programmierung > Probabilistischer früher Ablauf in Go

Probabilistischer früher Ablauf in Go

Veröffentlicht am 09.11.2024
Durchsuche:223

Über Cache-Stampedes

Ich komme oft in Situationen, in denen ich dieses oder jenes zwischenspeichern muss. Häufig werden diese Werte für einen bestimmten Zeitraum zwischengespeichert. Sie kennen das Muster wahrscheinlich. Sie versuchen, einen Wert aus dem Cache abzurufen. Wenn Ihnen das gelingt, geben Sie ihn an den Aufrufer zurück und beenden ihn. Wenn der Wert nicht vorhanden ist, rufen Sie ihn ab (höchstwahrscheinlich aus der Datenbank) oder berechnen ihn und legen ihn im Cache ab. In den meisten Fällen funktioniert das hervorragend. Wenn jedoch häufig auf den Schlüssel zugegriffen wird, den Sie für Ihren Cache-Eintrag verwenden, und die Berechnung der Daten eine Weile dauert, kommt es zu einer Situation, in der bei mehreren parallelen Anforderungen gleichzeitig ein Cache-Fehler auftritt. Alle diese Anfragen laden unabhängig voneinander die Quelle und speichern den Wert im Cache. Dies führt zur Verschwendung von Ressourcen und kann sogar zu einem Denial-of-Service führen.

Lassen Sie mich anhand eines Beispiels veranschaulichen. Ich verwende Redis für den Cache und einen einfachen Go-HTTP-Server darüber. Hier ist der vollständige Code:

package main

import (
    "errors"
    "log"
    "net/http"
    "time"

    "github.com/redis/go-redis/v9"
)

type handler struct {
    rdb *redis.Client
    cacheTTL time.Duration
}

func (ch *handler) simple(w http.ResponseWriter, r *http.Request) {
    cacheKey := "my_cache_key"
    // we'll use 200 to signify a cache hit & 201 to signify a miss
    responseCode := http.StatusOK
    cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result()
    if err != nil {
        if !errors.Is(err, redis.Nil) {
            log.Println("could not reach redis", err.Error())
            http.Error(w, "could not reach redis", http.StatusInternalServerError)
            return
        }

        // cache miss - fetch & store
        res := longRunningOperation()
        responseCode = http.StatusCreated

        err = ch.rdb.Set(r.Context(), cacheKey, res, ch.cacheTTL).Err()
        if err != nil {
            log.Println("failed to set cache value", err.Error())
            http.Error(w, "failed to set cache value", http.StatusInternalServerError)
            return
        }
        cachedData = res
    }
    w.WriteHeader(responseCode)
    _, _ = w.Write([]byte(cachedData))
}

func longRunningOperation() string {
    time.Sleep(time.Millisecond * 500)
    return "hello"
}

func main() {
    ttl := time.Second * 3
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    handler := &handler{
        rdb: rdb,
        cacheTTL: ttl,
    }

    http.HandleFunc("/simple", handler.simple)
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("Could not start server: %s\n", err.Error())
    }
}

Lassen Sie uns den /simple-Endpunkt etwas belasten und sehen, was passiert. Ich werde dafür Vegeta verwenden.

Ich führe vegeta attack -duration=30s -rate=500 -targets=./targets_simple.txt > res_simple.bin aus. Am Ende stellt Vegeta 30 Sekunden lang jede Sekunde 500 Anfragen. Ich stelle sie als Histogramm von HTTP-Ergebniscodes mit Buckets dar, die jeweils 100 ms umfassen. Das Ergebnis ist die folgende Grafik.

Probabilistic Early Expiration in Go

Wenn wir das Experiment starten, ist der Cache leer – wir haben dort keinen Wert gespeichert. Wir erleben den ersten Ansturm, als eine Reihe von Anfragen unseren Server erreichen. Alle überprüfen den Cache, finden dort nichts, rufen die longRunningOperation auf und speichern sie im Cache. Da die longRunningOperation ca. 500 ms benötigt, um alle in den ersten 500 ms gestellten Anforderungen abzuschließen, wird am Ende longRunningOperation aufgerufen. Sobald es einer der Anfragen gelingt, den Wert im Cache zu speichern, rufen ihn alle folgenden Anfragen aus dem Cache ab und wir sehen Antworten mit dem Statuscode 200. Das Muster wiederholt sich dann alle 3 Sekunden, wenn der Ablaufmechanismus auf Redis greift.

In diesem Spielzeugbeispiel verursacht dies keine Probleme, aber in einer Produktionsumgebung kann dies zu einer unnötigen Belastung Ihrer Systeme, einer beeinträchtigten Benutzererfahrung oder sogar einem selbstverursachten Denial-of-Service führen. Wie können wir das also verhindern? Nun, es gibt ein paar Möglichkeiten. Wir könnten eine Sperre einführen – jeder Cache-Fehler würde dazu führen, dass der Code versucht, eine Sperre zu erreichen. Verteiltes Sperren ist keine triviale Sache und oft gibt es subtile Randfälle, die eine sorgfältige Handhabung erfordern. Wir könnten den Wert auch in regelmäßigen Abständen mithilfe eines Hintergrundjobs neu berechnen. Dies erfordert jedoch die Ausführung eines zusätzlichen Prozesses, wodurch ein weiteres Rädchen entsteht, das in unserem Code verwaltet und überwacht werden muss. Dieser Ansatz ist möglicherweise auch nicht umsetzbar, wenn Sie über dynamische Cache-Schlüssel verfügen. Es gibt einen anderen Ansatz, der als probabilistischer früher Ablauf bezeichnet wird und den ich gerne weiter erforschen würde.

Probabilistischer früher Ablauf

Diese Technik ermöglicht es, den Wert basierend auf einer Wahrscheinlichkeit neu zu berechnen. Beim Abrufen des Werts aus dem Cache berechnen Sie anhand einer Wahrscheinlichkeit auch, ob Sie den Cache-Wert neu generieren müssen. Je näher Sie am Ablauf des bestehenden Wertes sind, desto höher ist die Wahrscheinlichkeit.

Ich stütze die spezifische Implementierung auf XFetch von A. Vattani, F.Chierichetti und K. Lowenstein in Optimal Probabilistic Cache Stampede Prevention.

Ich werde einen neuen Endpunkt auf dem HTTP-Server einführen, der ebenfalls die teure Berechnung durchführt, dieses Mal jedoch XFetch beim Caching verwendet. Damit XFetch funktioniert, müssen wir speichern, wie lange der teure Vorgang gedauert hat (das Delta) und wann der Cache-Schlüssel abläuft. Um das zu erreichen, führe ich eine Struktur ein, die diese Werte sowie die Nachricht selbst enthält:

type probabilisticValue struct {
    Message string
    Expiry time.Time
    Delta time.Duration
}

Ich füge eine Funktion hinzu, um die ursprüngliche Nachricht mit diesen Attributen zu verpacken und sie zum Speichern in Redis zu serialisieren:

func wrapMessage(message string, delta, cacheTTL time.Duration) (string, error) {
    bts, err := json.Marshal(probabilisticValue{
        Message: message,
        Delta: delta,
        Expiry: time.Now().Add(cacheTTL),
    })
    if err != nil {
        return "", fmt.Errorf("could not marshal message: %w", err)
    }

    return string(bts), nil
}

Lassen Sie uns auch eine Methode schreiben, um den Wert in Redis neu zu berechnen und zu speichern:

func (ch *handler) recomputeValue(ctx context.Context, cacheKey string) (string, error) {
    start := time.Now()
    message := longRunningOperation()
    delta := time.Since(start)

    wrapped, err := wrapMessage(message, delta, ch.cacheTTL)
    if err != nil {
        return "", fmt.Errorf("could not wrap message: %w", err)
    }
    err = ch.rdb.Set(ctx, cacheKey, wrapped, ch.cacheTTL).Err()
    if err != nil {
        return "", fmt.Errorf("could not save value: %w", err)
    }
    return message, nil
}

Um festzustellen, ob wir den Wert basierend auf der Wahrscheinlichkeit aktualisieren müssen, können wir probabilisticValue eine Methode hinzufügen:

func (pv probabilisticValue) shouldUpdate() bool {
    // suggested default param in XFetch implementation
    // if increased - results in earlier expirations
    beta := 1.0
    now := time.Now()
    scaledGap := pv.Delta.Seconds() * beta * math.Log(rand.Float64())
    return now.Sub(pv.Expiry).Seconds() >= scaledGap
}

Wenn wir alles zusammenfügen, erhalten wir den folgenden Handler:

func (ch *handler) probabilistic(w http.ResponseWriter, r *http.Request) {
    cacheKey := "probabilistic_cache_key"
    // we'll use 200 to signify a cache hit & 201 to signify a miss
    responseCode := http.StatusOK
    cachedData, err := ch.rdb.Get(r.Context(), cacheKey).Result()
    if err != nil {
        if !errors.Is(err, redis.Nil) {
            log.Println("could not reach redis", err.Error())
            http.Error(w, "could not reach redis", http.StatusInternalServerError)
            return
        }

        res, err := ch.recomputeValue(r.Context(), cacheKey)
        if err != nil {
            log.Println("could not recompute value", err.Error())
            http.Error(w, "could not recompute value", http.StatusInternalServerError)
            return
        }
        responseCode = http.StatusCreated
        cachedData = res

        w.WriteHeader(responseCode)
        _, _ = w.Write([]byte(cachedData))
        return
    }

    pv := probabilisticValue{}
    err = json.Unmarshal([]byte(cachedData), &pv)
    if err != nil {
        log.Println("could not unmarshal probabilistic value", err.Error())
        http.Error(w, "could not unmarshal probabilistic value", http.StatusInternalServerError)
        return
    }

    if pv.shouldUpdate() {
        _, err := ch.recomputeValue(r.Context(), cacheKey)
        if err != nil {
            log.Println("could not recompute value", err.Error())
            http.Error(w, "could not recompute value", http.StatusInternalServerError)
            return
        }
        responseCode = http.StatusAccepted
    }

    w.WriteHeader(responseCode)
    _, _ = w.Write([]byte(cachedData))
}

Der Handler funktioniert ähnlich wie der erste, jedoch würfeln wir, wenn wir einen Cache-Treffer erhalten. Abhängig vom Ergebnis geben wir entweder einfach den gerade abgerufenen Wert zurück oder aktualisieren den Wert vorzeitig.

Wir verwenden die HTTP-Statuscodes, um zwischen den drei Fällen zu unterscheiden:

  • 200 – wir haben den Wert aus dem Cache zurückgegeben
  • 201 – Cache-Fehler, kein Wert vorhanden
  • 202 – Cache-Treffer, probabilistische Aktualisierung ausgelöst

Ich starte Vegeta noch einmal, diesmal gegen den neuen Endpunkt, und hier ist das Ergebnis:

Probabilistic Early Expiration in Go

Die kleinen blauen Kleckse dort zeigen an, wann wir den Cache-Wert tatsächlich vorzeitig aktualisiert haben. Nach der ersten Aufwärmphase sehen wir keine Cache-Fehler mehr. Um die anfängliche Spitze zu vermeiden, können Sie den zwischengespeicherten Wert vorab speichern, wenn dies für Ihren Anwendungsfall wichtig ist.

Wenn Sie beim Caching aggressiver vorgehen und den Wert häufiger aktualisieren möchten, können Sie mit dem Beta-Parameter experimentieren. So sieht das gleiche Experiment aus, wenn der Beta-Parameter auf 2 gesetzt ist:

Probabilistic Early Expiration in Go

Wir sehen jetzt viel häufiger probabilistische Aktualisierungen.

Alles in allem ist dies eine nette kleine Technik, die dabei helfen kann, Cache-Stampedes zu vermeiden. Beachten Sie jedoch, dass dies nur funktioniert, wenn Sie regelmäßig denselben Schlüssel aus dem Cache abrufen. Andernfalls werden Sie keinen großen Nutzen sehen.

Haben Sie eine andere Möglichkeit, mit Cache-Stampedes umzugehen? Ist Ihnen ein Fehler aufgefallen? Lass es mich unten in den Kommentaren wissen!

Freigabeerklärung Dieser Artikel wird unter: https://dev.to/vkuznecovas/probabilistic-early-expiration-in-go-48h?1 reproduziert. Wenn es zu Verstößen besteht, wenden Sie sich bitte an [email protected], um ihn zu löschen.
Neuestes Tutorial Mehr>

Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.

Copyright© 2022 湘ICP备2022001581号-3