Есть ли какой-либо способ повысить эффективность работы PySpark outputs?


Я пытаюсь проверить способность PySpark перебирать некоторые очень большие (от 10s GBs до 1s TBs) данные. Для большинства скриптов PySpark имеет примерно такую же эффективность, как и Scala-код. В других случаях (как в приведенном ниже коде) я получаю серьезные проблемы со скоростью от 10 до 12 раз медленнее.

path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext   

df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))
 .filter(lambda r: r[-1] != None)
 .countByValue()

print([((x[0], x[1], x[2]), y) for x, y in rddx.items()]) 

Мы думаем, что мы изолировали проблему .countByValue () (который возвращает defaultdict), но применение countItems () или reduceByKey () дает почти те же результаты. Мы также на 99% уверены, что проблема не в ExtractDomain или CreateAVertexFromSourceUrlAndContent (не настоящие имена функций, а просто псевдокод, чтобы сделать его понятным).

Итак, мой вопрос первый

    Есть ли что-нибудь в этом коде, что я могу сделать, чтобы сократить время?
  1. является PySpark фундаментально , что намного медленнее, чем его Scala двойник?
  2. Есть ли способ скопировать плоскую карту использование вместо этого фреймов данных PySpark (понимая, что фреймы данных-это вообще быстрее, чем RDD в Pyspark)?
1 3

1 ответ:

Самой большой проблемой здесь может быть коммуникация - Spark SQL (columnar format) -> plain Scala object -> pickle (Pyrolite) -> socket -> unpickle -> plain Python object. Это много копирования, преобразования и перемещения вещей.

Есть способ реплицировать flatmap, используя вместо этого фреймы данных PySpark

Да. Это называется explode - но справедливости ради надо сказать, что и он медлителен.

Понимание того, что фреймы данных обычно быстрее, чем RDD в Пыспарк

Это обычно верно (Scala и Python оба), но вам может понадобиться udf для реализации ExtractDomain или CreateAVertexFromSourceUrlAndContent - это еще одна медленная вещь. Просто из имен, которые вы могли бы использовать parse_url_tuple.

Является ли PySpark фундаментально настолько медленнее, чем его аналог Scala?

Это несколько медленнее. Обычно не так медленнее на хорошо настроенном коде. Но детали реализации различны - один и тот же набор операций в Scala и Python может быть материализовался по-другому.

Есть ли что-нибудь в этом коде, что я могу сделать, чтобы сократить время?

Я бы рекомендовал сначала профилировать. После того, как вы определили, какая часть ответственна (конверсии, слияния), вы можете попытаться нацелить ее.