Ve Sparku jsou funkce na RDD
s (jako map
zde) jsou serializovány a odeslány exekutorům ke zpracování. To znamená, že všechny prvky obsažené v těchto operacích by měly být serializovatelné.
Připojení Redis zde nelze serializovat, protože otevírá TCP připojení k cílové DB, která jsou navázána na počítač, kde bylo vytvořeno.
Řešením je vytvořit tato připojení na exekutorech v kontextu místního provádění. Existuje několik způsobů, jak to udělat. Dvěma, které mě napadnou, jsou:
rdd.mapPartitions
:umožňuje zpracovat celý oddíl najednou, a tím amortizovat náklady na vytváření připojení)- Správci připojení Singleton:Vytvořte připojení jednou pro každý spouštěcí program
mapPartitions
je jednodušší, protože vyžaduje pouze malou změnu struktury programu:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Správce připojení singleton lze modelovat pomocí objektu, který obsahuje líný odkaz na připojení (poznámka:bude fungovat i měnitelný ref).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Tento objekt lze poté použít k vytvoření instance 1 připojení na pracovní JVM a používá se jako Serializable
objekt v provozní uzávěrce.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Výhodou použití objektu singleton je menší režie, protože připojení vytváří pouze jednou JVM (na rozdíl od 1 na oddíl RDD)
Existují také některé nevýhody:
- vyčištění připojení je složité (vypínací hák/časovače)
- je třeba zajistit bezpečnost vláken sdílených zdrojů
(*) kód poskytnutý pro ilustrační účely. Není zkompilováno ani testováno.