sql >> Databáze >  >> NoSQL >> Redis

Návrh aplikace s Redis jako úložiště dat. Co? Proč?

1) Úvod

Ahoj všichni! Mnoho lidí ví, co je Redis, a pokud to nevíte, oficiální stránky vám mohou přinést aktuální informace.
Pro většinu Redis je mezipaměť a někdy i fronta zpráv.
Co když se ale trochu zblázníme a zkusíme navrhnout celou aplikaci pouze s využitím Redis jako úložiště dat? Jaké úkoly můžeme vyřešit s Redis?
Na tyto otázky se pokusíme odpovědět v tomto článku.

Co zde neuvidíme?

  • Nebude zde uvedena každá podrobná datová struktura Redis. Pro jaké účely byste si měli přečíst speciální články nebo dokumentaci.
  • Zde také nebude žádný kód připravený k produkci, který byste mohli použít ve své práci.

Co zde uvidíme?

  • Použijeme různé datové struktury Redis k implementaci různých úkolů seznamovací aplikace.
  • Zde budou příklady kódu Kotlin + Spring Boot.

2) Naučte se vytvářet a dotazovat uživatelské profily.

  • Za prvé, pojďme se naučit, jak vytvořit uživatelské profily s jejich jmény, lajky atd.

    K tomu potřebujeme jednoduché úložiště párů klíč–hodnota. Jak to udělat?

  • Jednoduše. Redis má datovou strukturu - hash. V podstatě je to jen známá hash mapa pro nás všechny.

Příkazy dotazovacího jazyka Redis lze nalézt zde a zde.
Dokumentace má dokonce interaktivní okno pro provádění těchto příkazů přímo na stránce. A celý seznam příkazů najdete zde.
Podobné odkazy fungují pro všechny následující příkazy, které budeme uvažovat.

V kódu používáme RedisTemplate téměř všude. Toto je základní věc pro práci s Redis v ekosystému Spring.

Jediný rozdíl oproti mapě je v tom, že jako první argument předáme "pole". „Pole“ je název našeho hashe.

fun addUser(user: User) {
        val hashOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
        hashOps.put(Constants.USERS, user.name, user)
    }

fun getUser(userId: String): User {
        val userOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
        return userOps.get(Constants.USERS, userId)?: throw NotFoundException("Not found user by $userId")
    }

Výše je příklad toho, jak by to mohlo vypadat v Kotlinu pomocí knihoven Spring.

Všechny části kódu z tohoto článku najdete na Github.

3) Aktualizace lajků uživatelů pomocí seznamů Redis.

  • Skvělý!. Máme uživatele a informace o hodnoceních Líbí se.

    Nyní bychom měli najít způsob, jak aktualizovat to lajky.

    Předpokládáme, že události se mohou dít velmi často. Použijme tedy asynchronní přístup s nějakou frontou. A my budeme číst informace z fronty podle plánu.

  • Redis má datovou strukturu seznamu s takovou sadou příkazů. Seznamy Redis můžete používat jako frontu FIFO i jako zásobník LIFO.

Na jaře používáme stejný přístup k získávání ListOperations z RedisTemplate.

Musíme psát vpravo. Protože zde simulujeme FIFO frontu zprava doleva.

fun putUserLike(userFrom: String, userTo: String, like: Boolean) {
        val userLike = UserLike(userFrom, userTo, like)
        val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
        listOps.rightPush(Constants.USER_LIKES, userLike)
}

Teď budeme pracovat podle plánu.

Jednoduše přenášíme informace z jedné datové struktury Redis do druhé. To nám jako příklad stačí.

fun processUserLikes() {
        val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
        userLikes.forEach{updateUserLike(it)}
}

Aktualizace uživatelů je zde opravdu snadná. Pozdravte HashOperation z předchozího dílu.

private fun updateUserLike(userLike: UserLike) {
        val userOps: HashOperations<String, String, User> = userLikeRedisTemplate.opsForHash()
        val fromUser = userOps.get(Constants.USERS, userLike.fromUserId)?: throw UserNotFoundException(userLike.fromUserId)
        fromUser.fromLikes.add(userLike)
        val toUser = userOps.get(Constants.USERS, userLike.toUserId)?: throw UserNotFoundException(userLike.toUserId)
        toUser.fromLikes.add(userLike)

        userOps.putAll(Constants.USERS, mapOf(userLike.fromUserId to fromUser, userLike.toUserId to toUser))
    }

A nyní si ukážeme, jak získat data ze seznamu. Dostáváme to zleva. K získání hromady dat ze seznamu použijeme range metoda.
A je tu důležitý bod. Metoda rozsahu pouze získá data ze seznamu, ale nesmaže je.

K odstranění dat tedy musíme použít jinou metodu. trim Udělej to. (A můžete tam mít nějaké otázky).

private fun getUserLikesLast(number: Long): List<UserLike> {
        val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
        return (listOps.range(Constants.USER_LIKES, 0, number)?:mutableListOf()).filterIsInstance(UserLike::class.java)
            .also{
listOps.trim(Constants.USER_LIKES, number, -1)
}
}

A otázky jsou:

  • Jak získat data ze seznamu do několika vláken?
  • A jak zajistit, aby se data v případě chyby neztratila? Z krabice – nic. Musíte získat data ze seznamu v jednom vlákně. A všechny nuance, které se objeví, musíte zvládnout sami.

4) Odesílání push notifikací uživatelům pomocí pub/sub

  • Pokračuj v pohybu vpřed!
    Již máme uživatelské profily. Přišli jsme na to, jak naložit s proudem lajků od těchto uživatelů.

    Představte si ale případ, kdy chcete uživateli odeslat oznámení push ve chvíli, kdy se nám to líbí.
    Co budeš dělat?

  • Už máme asynchronní proces pro zpracování hodnocení Líbí se, takže do něj zabudujeme odesílání oznámení push. K tomuto účelu samozřejmě použijeme WebSocket. A můžeme to poslat přes WebSocket, kde dostaneme lajk. Ale co když chceme před odesláním spustit dlouhotrvající kód? Nebo co když chceme delegovat práci s WebSocket na jinou komponentu?
  • Znovu převezmeme a přeneseme naše data z jedné datové struktury (seznamu) Redis do jiné (hospoda/sub).
fun processUserLikes() {
        val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
                pushLikesToUsers(userLikes)
        userLikes.forEach{updateUserLike(it)}
}

private fun pushLikesToUsers(userLikes: List<UserLike>) {
  GlobalScope.launch(Dispatchers.IO){
        userLikes.forEach {
            pushProducer.publish(it)
        }
  }
}
@Component
class PushProducer(val redisTemplate: RedisTemplate<String, String>, val pushTopic: ChannelTopic, val objectMapper: ObjectMapper) {

    fun publish(userLike: UserLike) {
        redisTemplate.convertAndSend(pushTopic.topic, objectMapper.writeValueAsString(userLike))
    }
}

Vazba posluchače k ​​tématu se nachází v konfiguraci.
Nyní můžeme vzít našeho posluchače do samostatné služby.

@Component
class PushListener(val objectMapper: ObjectMapper): MessageListener {
    private val log = KotlinLogging.logger {}

    override fun onMessage(userLikeMessage: Message, pattern: ByteArray?) {
        // websocket functionality would be here
        log.info("Received: ${objectMapper.readValue(userLikeMessage.body, UserLike::class.java)}")
    }
}

5) Hledání nejbližších uživatelů pomocí geografických operací.

  • S lajky jsme skončili. Ale co schopnost najít nejbližší uživatele k danému bodu.

  • GeoOperations nám s tím pomůže. Uložíme páry klíč–hodnota, ale nyní je naší hodnotou souřadnice uživatele. K nalezení použijeme [radius](https://redis.io/commands/georadius) metoda. K nalezení předáme ID uživatele a samotný okruh vyhledávání.

Redis vrátí výsledek včetně našeho ID uživatele.

fun getNearUserIds(userId: String, distance: Double = 1000.0): List<String> {
    val geoOps: GeoOperations<String, String> = stringRedisTemplate.opsForGeo()
    return geoOps.radius(USER_GEO_POINT, userId, Distance(distance, RedisGeoCommands.DistanceUnit.KILOMETERS))
        ?.content?.map{ it.content.name}?.filter{ it!= userId}?:listOf()
}

6) Aktualizace polohy uživatelů prostřednictvím streamů

  • Realizovali jsme téměř vše, co jsme potřebovali. Ale nyní jsme opět v situaci, kdy musíme aktualizovat data, která by se mohla rychle změnit.

    Musíme tedy znovu použít frontu, ale bylo by hezké mít něco škálovatelnějšího.

  • Streamy Redis mohou pomoci vyřešit tento problém.
  • Pravděpodobně víte o Kafkovi a pravděpodobně dokonce víte o Kafkových streamech, ale není to totéž jako Redis streamy. Samotný Kafka je ale dost podobná věc jako Redis streamy. Je to také datová struktura se záznamem dopředu, která má skupinu spotřebitelů a offset. Jedná se o složitější datovou strukturu, ale umožňuje nám získávat data paralelně a pomocí reaktivního přístupu.

Podrobnosti najdete v dokumentaci ke streamu Redis.

Spring má ReactiveRedisTemplate a RedisTemplate pro práci s datovými strukturami Redis. Pro nás by bylo pohodlnější použít RedisTemplate pro zápis hodnoty a ReactiveRedisTemplate pro čtení. Pokud mluvíme o proudech. Ale v takových případech nebude fungovat nic.
Pokud někdo ví, proč to takhle funguje, kvůli Springu nebo Redis, napište do komentářů.

fun publishUserPoint(userPoint: UserPoint) {
    val userPointRecord = ObjectRecord.create(USER_GEO_STREAM_NAME, userPoint)
    reactiveRedisTemplate
        .opsForStream<String, Any>()
        .add(userPointRecord)
        .subscribe{println("Send RecordId: $it")}
}

Naše metoda posluchače bude vypadat takto:

@Service
class UserPointsConsumer(
    private val userGeoService: UserGeoService
): StreamListener<String, ObjectRecord<String, UserPoint>> {

    override fun onMessage(record: ObjectRecord<String, UserPoint>) {
        userGeoService.addUserPoint(record.value)
    }
}

Naše data pouze přesuneme do geografické datové struktury.

7) Počítejte jedinečné relace pomocí HyperLogLog.

  • A nakonec si představme, že potřebujeme spočítat, kolik uživatelů za den vstoupilo do aplikace.
  • Navíc mějme na paměti, že můžeme mít mnoho uživatelů. Jednoduchá možnost pomocí hash mapy pro nás tedy není vhodná, protože spotřebovává příliš mnoho paměti. Jak toho můžeme dosáhnout s použitím menšího množství zdrojů?
  • Tam vstupuje do hry pravděpodobnostní datová struktura HyperLogLog. Více si o tom můžete přečíst na stránce Wikipedie. Klíčovou vlastností je, že tato datová struktura nám umožňuje vyřešit problém s použitím výrazně menší paměti než možnost s hash mapou.


fun uniqueActivitiesPerDay(): Long {
    val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
    return hyperLogLogOps.size(Constants.TODAY_ACTIVITIES)
}

fun userOpenApp(userId: String): Long {
    val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
    return hyperLogLogOps.add(Constants.TODAY_ACTIVITIES, userId)
}

8) Závěr

V tomto článku jsme se podívali na různé datové struktury Redis. Včetně nepříliš oblíbených geo operací a HyperLogLogu.
Používali jsme je k řešení skutečných problémů.

Téměř jsme navrhli Tinder, poté je to možné ve FAANG)))
Také jsme zdůraznili hlavní nuance a problémy, se kterými se lze při práci s Redis setkat.

Redis je velmi funkční datové úložiště. A pokud jej již máte ve své infrastruktuře, může stát za to podívat se na Redis jako na nástroj, který s ním bez zbytečných komplikací vyřeší vaše další úkoly.

PS:
Všechny příklady kódu lze nalézt na github.

Napište do komentářů, pokud si všimnete chyby.
Zanechte níže komentář o takovém způsobu popisu použití nějaké technologie. Líbí se vám to nebo ne?

A sledujte mě na Twitteru:🐦@de____ro


  1. Jak efektivně vložit miliardu dat do Redis?

  2. Relace NodeJS + ExpressJS + RedisStore není definována

  3. Přidejte úvodní nuly v SQL

  4. MongoDB jako databáze časových řad