Есть ли какой-либо способ повысить эффективность работы PySpark outputs?
Я пытаюсь проверить способность PySpark перебирать некоторые очень большие (от 10s GBs до 1s TBs) данные. Для большинства скриптов PySpark имеет примерно такую же эффективность, как и Scala-код. В других случаях (как в приведенном ниже коде) я получаю серьезные проблемы со скоростью от 10 до 12 раз медленнее.
path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext
df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))
.flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))
.filter(lambda r: r[-1] != None)
.countByValue()
print([((x[0], x[1], x[2]), y) for x, y in rddx.items()])
Мы думаем, что мы изолировали проблему .countByValue () (который возвращает defaultdict), но применение countItems () или reduceByKey () дает почти те же результаты. Мы также на 99% уверены, что проблема не в ExtractDomain или CreateAVertexFromSourceUrlAndContent (не настоящие имена функций, а просто псевдокод, чтобы сделать его понятным).
Итак, мой вопрос первый
-
Есть ли что-нибудь в этом коде, что я могу сделать, чтобы сократить время?
- является PySpark фундаментально , что намного медленнее, чем его Scala двойник?
- Есть ли способ скопировать плоскую карту использование вместо этого фреймов данных PySpark (понимая, что фреймы данных-это вообще быстрее, чем RDD в Pyspark)?
1 ответ:
Самой большой проблемой здесь может быть коммуникация - Spark SQL (columnar format) -> plain Scala object -> pickle (Pyrolite) -> socket -> unpickle -> plain Python object. Это много копирования, преобразования и перемещения вещей.
Есть способ реплицировать flatmap, используя вместо этого фреймы данных PySpark
Да. Это называется
explode
- но справедливости ради надо сказать, что и он медлителен.Понимание того, что фреймы данных обычно быстрее, чем RDD в Пыспарк
Это обычно верно (Scala и Python оба), но вам может понадобиться
udf
для реализацииExtractDomain
илиCreateAVertexFromSourceUrlAndContent
- это еще одна медленная вещь. Просто из имен, которые вы могли бы использоватьparse_url_tuple
.Является ли PySpark фундаментально настолько медленнее, чем его аналог Scala?
Это несколько медленнее. Обычно не так медленнее на хорошо настроенном коде. Но детали реализации различны - один и тот же набор операций в Scala и Python может быть материализовался по-другому.
Есть ли что-нибудь в этом коде, что я могу сделать, чтобы сократить время?Я бы рекомендовал сначала профилировать. После того, как вы определили, какая часть ответственна (конверсии, слияния), вы можете попытаться нацелить ее.