Как определить разбиение фрейма данных?


Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель на фреймах данных, в Scala, но не вижу, как это сделать.

одна из таблиц данных, с которыми я работаю, содержит список транзакций, по счету, silimar в следующем примере.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

по крайней мере, первоначально большинство вычислений будет происходить между транзакциями внутри счета. Поэтому я хотел бы, чтобы данные были разделены так, чтобы все транзакции для счета находятся в том же разделе Spark.

но я не вижу способа определить это. Класс DataFrame имеет метод с именем 'repartition (Int)', в котором можно указать количество создаваемых разделов. Но я не вижу никакого метода, доступного для определения пользовательского разделителя для фрейма данных, такого как можно указать для RDD.

исходные данные хранятся в Parquet. Я видел, что при написании фрейма данных для паркета вы можете указать столбец для разделения, поэтому, по-видимому, я мог бы сказать Parquet, чтобы разделить его данные по столбцу "учетная запись". Но там могут быть миллионы учетных записей, и если я правильно понимаю Parquet, он создаст отдельный каталог для каждой учетной записи, так что это не похоже на разумное решение.

есть ли способ заставить Spark разбить этот фрейм данных так, чтобы все данные для учетной записи находились в одном разделе?

5 98

5 ответов:

Искра >= 2.3.0

Искра-22614 предоставляет разбиение диапазона.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

Искра-22389 предоставляет внешний формат секционирования в источник данных API v2.

Искра >= 1.6.0

в Spark >= 1.6 можно использовать секционирование по столбцам для запроса и кэширования. Смотрите:Искра-11410 и Искра-4849 используя repartition метод:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

в отличие от RDDs Искра Dataset (включая Dataset[Row] а.к.а DataFrame) не может использовать пользовательский разделитель, как сейчас. Обычно вы можете решить эту проблему, создав искусственный столбец секционирования, но это не даст вам такой же гибкости.

Spark

одна вещь, которую вы можете сделать, это предварительно разбить входные данные перед созданием DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

С DataFrame создание RDD требуется только a простой этап карты существующий макет раздела должен быть сохранен*:

assert(df.rdd.partitions == partitioned.partitions)

таким же образом вы можете переделать существующие DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

так что, похоже, это не является невозможным. Остается вопрос, имеет ли это вообще смысл. Я буду утверждать, что большую часть времени это не так:

  1. перераспределение является дорогостоящим процессом. В типичном сценарии большая часть данных должна быть сериализована, перетасована и десериализована. С другой стороны количество операций, которые могут извлечь выгоду из предварительно разделенных данных, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.

    • присоединяется в некоторых сценариях, но это потребует внутренней поддержки,
    • вызовы оконных функций с соответствующим разделителем. То же, что и выше, ограничено одним определением окна. Он уже разделен внутри, поэтому предварительное разделение может быть избыточным,
    • простой агрегации с GROUP BY - можно уменьшить объем памяти временных буферов**, но общая стоимость намного выше. Более или менее эквивалентно groupByKey.mapValues(_.reduce) (текущее поведение) vs reduceByKey (переразметке). Вряд ли пригодится на практике.
    • сжатие данных с SqlContext.cacheTable. Поскольку похоже, что он использует кодировку длины выполнения, применяя OrderedRDDFunctions.repartitionAndSortWithinPartitions может улучшить степень сжатия.
  2. производительность сильно зависит от a распределение ключей. Если он перекошен, это приведет к неоптимальному использованию ресурсов. В худшем случае будет невозможно закончить работу вообще.

  3. весь смысл использования декларативного API высокого уровня заключается в том, чтобы изолировать себя от деталей реализации низкого уровня. Как уже упоминалось @dwysakowicz и @RomiKuntsman оптимизация-это работа Катализатор Оптимизатор. Это довольно сложный зверь, и я действительно сомневаюсь, что вы можете легко улучшить это, не погружаясь намного глубже в его внутренности.

связанные понятия

разделение с источниками JDBC:

поддержка источников данных JDBC predicates аргумент. Его можно использовать следующим образом:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

он создает один раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не являются непересекающиеся вы увидите дубликаты в итоговой таблице.

partitionBy метод DataFrameWriter:

Искра DataFrameWriter предоставляет partitionBy метод, который может быть использован для" разбиения " данных на записи. Он разделяет данные на запись с помощью предоставленного набора столбцов

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

это позволяет предикату нажимать на чтение для запросов на основе ключа:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

но это не эквивалентно DataFrame.repartition. В частности агрегации например:

val cnts = df1.groupBy($"k").sum()

все равно потребуется TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy метод DataFrameWriter (Искра >= 2.0):

bucketBy имеет аналогичные приложения, как partitionBy но он доступен только для таблиц (saveAsTable). Группируете информацию можно использовать для оптимизации соединения:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* By макет раздела я имею в виду только распределение данных. partitioned RDD больше не имеет разделителя. ** Самонадеянный никаких ранних прогнозов. Если агрегация охватывает только небольшое подмножество столбцов, вероятно, нет никакого выигрыша.

в Spark HiveContext, а не просто старый SqlContext можно использовать HiveQLDISTRIBUTE BY colX... (гарантирует, что каждый из N редукторов получает неперекрывающиеся диапазоны x) & CLUSTER BY colX... (ярлык для распределения и сортировки) например:

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

не уверен, как это вписывается в Spark DF api. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь hive meta store, чтобы использовать HiveContext)

EDIT: Spark 1.6+ теперь имеет это в собственном API DataFrame

используйте фрейм данных, возвращаемый:

yourDF.orderBy(account)

нет явного способа использовать partitionBy на фрейме данных, только на PairRDD, но когда вы сортируете фрейм данных, он будет использовать это в своем LogicalPlan, и это поможет, когда вам нужно сделать вычисления для каждой учетной записи.

Я просто наткнулся на ту же самую проблему, с фреймом данных, который я хочу разделить по учетной записи. Я предполагаю, что когда вы говорите " хотите, чтобы данные были разделены так, чтобы все транзакции для учетная запись находится в том же разделе Spark", вы хотите его для масштабирования и производительности, но ваш код не зависит от него (например, с помощью mapPartitions() и т. д.), верно?

я смог сделать это с помощью RDD. Но я не знаю, является ли это приемлемым решением для вас. Если у вас есть DF, доступный как RDD, вы можете применить repartitionAndSortWithinPartitions для выполнения пользовательского разбиения данных.

вот пример, который я использовал:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)

Итак, чтобы начать с какого - то ответа:) - вы не можете

Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd и DataFrame не имеет такого понятия, как разделитель.

Вообще идея DataFrame заключается в том, чтобы обеспечить другой уровень абстракции, который сам обрабатывает такие проблемы. Запросы на DataFrame преобразуются в логический план, который далее преобразуется в операции на RDDs. Разделение, которое вы предложили, вероятно, будет применено автоматически или, по крайней мере, должно быть.

Если вы не доверяете SparkSQL, что он обеспечит какую-то оптимальную работу, вы всегда можете преобразовать DataFrame в RDD[Row], как это предлагается в комментариях.