Преобразование потока ввода Spark-kafka в массив[байт]


Я использую scala и потребляю данные из Kafka, используя следующий подход потоковой передачи Spark:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

Выше переменная возвращает InputDStream, через который я могу видеть данные в raw / двоичном формате, используя код ниже: println (строка)

Но мне нужно применить формат avro (схема доступна) на raw/двоичном формате, чтобы увидеть данные в ожидаемом формате json. Чтобы применить формат avro, мне нужно преобразовать приведенный выше InputDStream в массив [байт], который используется avro.

Может ли кто-то пожалуйста, дайте мне знать, преобразовать InputDStream в массив [байт]?

Или

Если вы знаете какой-то лучший способ применить схему avro на InputDStream(потоковой передачи spark), пожалуйста, поделитесь.

1 2

1 ответ:

Две вещи, которые нужно сделать. Первый-использовать DefaultDecoder для Кафки, который дает вам Array[Byte] для типа значения:

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

И затем вам нужно применить свою логику десериализации Avro с помощью дополнительного map:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

Где avroDeserializer - это ваш произвольный класс, который знает, как создать ваш тип из байтов Avro.

Я лично использую avro4s для получения десериализации класса case с помощью макросов.