Потребляйте сообщения Кафки Авро в go


Я пытаюсь использовать сообщения Кафки в формате avro, но я не могу декодировать сообщения из avro в JSON в Go.

Я использую платформу Confluent (3.0.1). Например, я создаю сообщения avro, такие как:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

Теперь я потребляю сообщения с библиотекой go Kafka: sarama. Обычные текстовые сообщения работают нормально. Сообщение Avro должно быть расшифровано. Я нашел разные библиотеки: github.com/linkedin/goavro, github.com/elodina/go-avro

Но после декодирования я получаю json без значения (обе библиотеки):

{"f1":""}

Гоавро:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

Go-avro:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

Msg = сарама.ConsumerMessage

2 2

2 ответа:

Первый байт-это магический байт (0). Следующие 4 байта являются идентификатором схемы avro

, что действительно полезно только при использовании реестра схемы Confluent.

Только что узнал (сравнивая двоичные сообщения avro), что мне пришлось удалить первые 5 элементов массива байтов сообщений-теперь все работает:)

message = msg.Value[5:]

Может быть, кто-нибудь объяснит, почему