как фильтровать поле MapType фрейма данных Spark


У меня есть фрейм данных Spark, где одно из полей имеет тип MapType....Я могу извлечь данные любого из ключей поля maptype, но не могу этого сделать, когда применяю фильтр для конкретного значения конкретного ключа...

val line = List (("Sanjay", Map("one" -> 1, "two" -> 2)), ("Taru", Map("one" -> 10, "two" -> 20)) )

Я создал RDD & DF из приведенного выше списка и пытаюсь извлечь в DF, сопоставить значения, где значение if >= 5 .....Но я получаю ниже исключение в Spark Repl.. Пожалуйста, помогите

val rowrddDFFinal = rowrddDF.select(rowrddDF("data.one").alias("data")).filter(rowrddDF("data.one").geq(5))

Орг.апаш.искра.язык SQL.AnalysisException: разрешенные атрибуты данных#1 пропал без вести // / g из данных#3 в операторе !Фильтр (данные#1 [один] как один#4 >= 5); // / в орг.апаш.искра.язык SQL.катализатор.анализ.CheckAnalysis$класс.failAnalys // / is (CheckAnalysis.скала: 38) // / в орг.апаш.искра.язык SQL.катализатор.анализ.Анализатор.failAnalysis (анализатор //| .скала: 42) // / в орг.апаш.искра.язык SQL.катализатор.анализ.CheckAnalysis$ $ anonfun$checkAn // / алисис$1.применить (CheckAnalysis.scala: 121) // / в орг.апаш.искра.язык SQL.катализатор.анализ.CheckAnalysis$ $ anonfun$checkAn // / алисис$1.применить (CheckAnalysis.scala: 50) // / в орг.апаш.искра.язык SQL.катализатор.деревья.объект TreeNode.foreachUp(узлов дерева.скала // | : 98) // / в орг.апаш.искра.язык SQL.катализатор.анализ.CheckAnalysis$класс.checkAnaly // / sis (CheckAnalysis.scala: 50) // / на орг..апаш.искра.язык SQL.катализатор.анализ.Анализатор.checkAnalysis (анализ // / р. скала: 42) // / в орг.апаш.искра.язык SQL.Объект Sqlcontext$QueryExecution.assertAnalyzed(SQLCont //| доб.scala: 931)

1 3

1 ответ:

Для доступа к значениям из столбца Array или Map можно использовать метод Column.getItem:

rowrddDF
 .where($"data".getItem("one").geq(5))
 .select($"data".getItem("one").alias("data"))

Если вы предпочитаете filter после select, вы больше не можете использовать rowrddDF.apply. Вместо этого вы должны получить доступ к колонке с псевдонимами напрямую:

df
  .select($"data".getItem("one").alias("data"))
  .filter($"data".geq(5))