sql >> Databáze >  >> NoSQL >> HBase

Robustní serializace zpráv v Apache Kafka pomocí Apache Avro, část 1

V Apache Kafka zapisují Java aplikace zvané producenti strukturované zprávy do clusteru Kafka (složeného z brokerů). Podobně, Java aplikace zvané spotřebitelé čtou tyto zprávy ze stejného clusteru. V některých organizacích existují různé skupiny odpovědné za psaní a řízení výrobců a spotřebitelů. V takových případech může být jedním z hlavních problémů koordinace dohodnutého formátu zpráv mezi výrobci a spotřebiteli.

Tento příklad ukazuje, jak používat Apache Avro k serializaci záznamů, které jsou vytvářeny Apache Kafka, a zároveň umožňuje vývoj schémat a nesynchronní aktualizaci aplikací pro producenty a spotřebitele.

Serializace a deserializace

Záznam Kafka (dříve nazývaný zpráva) se skládá z klíče, hodnoty a záhlaví. Kafka si není vědom struktury dat v klíči a hodnotě záznamů. Zpracovává je jako bajtová pole. Ale systémy, které čtou záznamy z Kafky, se starají o data v těchto záznamech. Musíte tedy produkovat data v čitelném formátu. Formát dat, který používáte, by měl

  • Buďte kompaktní
  • Rychle kódujte a dekódujte
  • Povolit vývoj
  • Povolit upstream systémům (těm, které zapisují do clusteru Kafka) a downstream systémům (těm, které čtou ze stejného clusteru Kafka) upgradovat na novější schémata v různých časech

JSON je například samozřejmý, ale není kompaktním datovým formátem a jeho analýza je pomalá. Avro je rychlý serializační rámec, který vytváří relativně kompaktní výstup. Ale ke čtení záznamů Avro potřebujete schéma, se kterým byla data serializována.

Jednou z možností je uložit a přenést schéma se samotným záznamem. To je v pořádku v souboru, kde schéma uložíte jednou a použijete jej pro velký počet záznamů. Ukládání schématu do každého záznamu Kafka však zvyšuje značnou režii, pokud jde o úložný prostor a využití sítě. Další možností je mít dohodnutou sadu mapování identifikátor-schéma a odkazovat na schémata pomocí jejich identifikátorů v záznamu.

Od objektu ke Kafkově záznamu a zpět

Producentské aplikace nepotřebují převádět data přímo do bajtových polí. KafkaProducer je generická třída, která potřebuje, aby její uživatel specifikoval typy klíčů a hodnot. Poté producenti přijímají instance ProducerRecord které mají stejné parametry typu. Převod z objektu na pole bajtů se provádí pomocí serializátoru. Kafka poskytuje některé primitivní serializátory:například IntegerSerializer , ByteArraySerializer , StringSerializer . Na straně spotřebitele podobné deserializéry převádějí bajtová pole na objekt, se kterým si aplikace může poradit.

Má tedy smysl zapojit se na úrovni Serializer a Deserializer a umožnit vývojářům aplikací pro výrobce a spotřebitele používat pohodlné rozhraní poskytované společností Kafka. Ačkoli nejnovější verze Kafka umožňují ExtendedSerializers a ExtendedDeserializers pro přístup k záhlavím jsme se rozhodli zahrnout identifikátor schématu do klíče a hodnoty záznamů Kafka namísto přidávání záhlaví záznamů.

Avro Essentials

Avro je framework pro serializaci dat (a vzdálené volání procedur). K popisu datových struktur používá dokument JSON nazvaný schéma. Většina použití Avro je přes GenericRecord nebo podtřídy SpecificRecord. Třídy Java generované ze schémat Avro jsou podtřídami druhých, zatímco první lze použít bez předchozí znalosti datové struktury, se kterou se pracuje.

Když dvě schémata splňují sadu pravidel kompatibility, lze data zapsaná jedním schématem (nazývaným schéma zapisovače) číst, jako by byla zapsána pomocí druhého schématu (nazývaného schéma čtečky). Schémata mají kanonickou formu, která obsahuje všechny podrobnosti, které jsou pro serializaci irelevantní, jako jsou komentáře, odstraněné, aby se usnadnila kontrola ekvivalence.

VersionedSchema a SchemaProvider

Jak již bylo zmíněno, potřebujeme mapování jedna ku jedné mezi schématy a jejich identifikátory. Někdy je jednodušší odkazovat na schémata jmény. Když je vytvořeno kompatibilní schéma, může být považováno za další verzi schématu. Na schémata tedy můžeme odkazovat jménem, ​​dvojicí verzí. Nazvěme schéma, jeho identifikátor, název a verzi společně VersionedSchema . Tento objekt může obsahovat další metadata, která aplikace vyžaduje.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider objekty mohou vyhledat instance VersionedSchema .

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

Jak je toto rozhraní implementováno, je popsáno v části „Implementing a Schema Store“ v budoucím příspěvku na blogu.

Serializace obecných dat

Při serializaci záznamu musíme nejprve zjistit, jaké schéma použít. Každý záznam má getSchema metoda. Zjištění identifikátoru ze schématu však může být časově náročné. Obecně je efektivnější nastavit schéma v době inicializace. To lze provést přímo identifikátorem nebo názvem a verzí. Kromě toho, když vytváříme pro více témat, můžeme chtít nastavit různá schémata pro různá témata a zjistit schéma z názvu tématu dodaného jako parametr pro metodu serialize(T, String) . Tato logika je v našich příkladech z důvodu stručnosti a jednoduchosti vynechána.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Se schématem v ruce je musíme uložit do naší zprávy. Serializace ID jako součásti zprávy nám poskytuje kompaktní řešení, protože všechna kouzla se odehrávají v Serializer/Deserializer. Umožňuje také velmi snadnou integraci s jinými frameworky a knihovnami, které již Kafka podporují, a umožňuje uživateli používat jejich vlastní serializátor (jako je Spark).

Pomocí tohoto přístupu nejprve zapíšeme identifikátor schématu na první čtyři bajty.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Poté můžeme vytvořit DatumWriter a serializovat objekt.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Když jsme to dali dohromady, implementovali jsme obecný datový serializátor.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Deserializace obecných dat

Deserializace může pracovat s jediným schématem (data schématu byla zapsána), ale můžete zadat jiné schéma čtečky. Schéma čtečky musí být kompatibilní se schématem, se kterým byla data serializována, ale nemusí být ekvivalentní. Z tohoto důvodu jsme zavedli názvy schémat. Nyní můžeme určit, že chceme číst data s konkrétní verzí schématu. Při inicializaci čteme požadované verze schématu podle názvu schématu a ukládáme metadata do readerSchemasByName pro rychlý přístup. Nyní můžeme číst každý záznam zapsaný kompatibilní verzí schématu, jako by byl zapsán se zadanou verzí.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Když je potřeba záznam deserializovat, nejprve načteme identifikátor schématu zapisovače. To umožňuje vyhledat schéma čtečky podle názvu. S oběma dostupnými schématy můžeme vytvořit GeneralDatumReader a přečtěte si záznam.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Zacházení s SpecificRecords

Více často než ne existuje jedna třída, kterou chceme použít pro naše záznamy. Tato třída je pak obvykle generována ze schématu Avro. Apache Avro poskytuje nástroje pro generování kódu Java ze schémat. Jedním z takových nástrojů je plugin Avro Maven. Vygenerované třídy mají za běhu k dispozici schéma, ze kterého byly vygenerovány. Díky tomu je serializace a deserializace jednodušší a efektivnější. Pro serializaci můžeme použít třídu ke zjištění identifikátoru schématu, který se má použít.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Nepotřebujeme tedy logiku k určení schématu z tématu a dat. K zápisu záznamů používáme schéma dostupné ve třídě záznamů.

Podobně pro deserializaci lze schéma čtečky zjistit ze samotné třídy. Logika deserializace se zjednoduší, protože schéma čtečky je v době konfigurace pevně dané a není třeba jej vyhledávat podle názvu schématu.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Další čtení

Další informace o kompatibilitě schémat naleznete ve specifikaci Avro pro rozlišení schématu.

Další informace o kanonických formulářích naleznete ve specifikaci Avro pro analýzu kanonického formuláře pro schémata.

Příště…

Část 2 ukáže implementaci systému pro ukládání definic schématu Avro.


  1. vlastní řazení mongoose/mongodb

  2. Sloučení dvou kolekcí v MongoDB

  3. Zničení / odstranění Queue() v Redis Queue (rq) programově

  4. Vlastní deserializace