Преобразование потока ввода Spark-kafka в массив[байт]
Я использую scala и потребляю данные из Kafka, используя следующий подход потоковой передачи Spark:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
Выше переменная возвращает InputDStream, через который я могу видеть данные в raw / двоичном формате, используя код ниже: println (строка)
Но мне нужно применить формат avro (схема доступна) на raw/двоичном формате, чтобы увидеть данные в ожидаемом формате json. Чтобы применить формат avro, мне нужно преобразовать приведенный выше InputDStream в массив [байт], который используется avro.
Может ли кто-то пожалуйста, дайте мне знать, преобразовать InputDStream в массив [байт]?
Или
Если вы знаете какой-то лучший способ применить схему avro на InputDStream(потоковой передачи spark), пожалуйста, поделитесь.
1 ответ:
Две вещи, которые нужно сделать. Первый-использовать
DefaultDecoder
для Кафки, который дает вамArray[Byte]
для типа значения:val lines: DStream[(String, Array[Byte])] = KafkaUtils .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
И затем вам нужно применить свою логику десериализации Avro с помощью дополнительного
map
:lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
Где
avroDeserializer
- это ваш произвольный класс, который знает, как создать ваш тип из байтов Avro.Я лично использую avro4s для получения десериализации класса case с помощью макросов.