sql >> Databáze >  >> NoSQL >> Redis

Redis na Spark:Task nelze serializovat

Ve Sparku jsou funkce na RDD s (jako map zde) jsou serializovány a odeslány exekutorům ke zpracování. To znamená, že všechny prvky obsažené v těchto operacích by měly být serializovatelné.

Připojení Redis zde nelze serializovat, protože otevírá TCP připojení k cílové DB, která jsou navázána na počítač, kde bylo vytvořeno.

Řešením je vytvořit tato připojení na exekutorech v kontextu místního provádění. Existuje několik způsobů, jak to udělat. Dvěma, které mě napadnou, jsou:

  • rdd.mapPartitions :umožňuje zpracovat celý oddíl najednou, a tím amortizovat náklady na vytváření připojení)
  • Správci připojení Singleton:Vytvořte připojení jednou pro každý spouštěcí program

mapPartitions je jednodušší, protože vyžaduje pouze malou změnu struktury programu:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Správce připojení singleton lze modelovat pomocí objektu, který obsahuje líný odkaz na připojení (poznámka:bude fungovat i měnitelný ref).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Tento objekt lze poté použít k vytvoření instance 1 připojení na pracovní JVM a používá se jako Serializable objekt v provozní uzávěrce.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Výhodou použití objektu singleton je menší režie, protože připojení vytváří pouze jednou JVM (na rozdíl od 1 na oddíl RDD)

Existují také některé nevýhody:

  • vyčištění připojení je složité (vypínací hák/časovače)
  • je třeba zajistit bezpečnost vláken sdílených zdrojů

(*) kód poskytnutý pro ilustrační účely. Není zkompilováno ani testováno.



  1. MongoDB $acosh

  2. Konverze dokumentů BSON na JSON v Javě

  3. 'session' není definováno při použití express / redis pro úložiště relací

  4. [Infographic] Srovnání Cassandry vs. MongoDB