Искра-уменьшить операцию, занимающую слишком много времени


Я делаю приложение с Spark, которое будет запускать некоторые алгоритмы извлечения темы. Для этого сначала мне нужно сделать некоторую предварительную обработку, извлекая матрицу документа-термина к концу. Я мог бы это сделать, но для большой коллекции документов (всего 2 тысячи 5 Мб) этот процесс занимает целую вечность.

Итак, отлаживая, я нашел, где программа как бы застревает,и это в операции сокращения. То, что я делаю в этой части кода, - это подсчитываю, сколько раз каждый термин встречается на коллекции, поэтому сначала я сделал "карту", отрисовав ее для каждого rdd, а затем" уменьшил " ее, сохранив результат внутри хэш-карты. Операция map выполняется очень быстро, но в reduce операция разбивается на 40 блоков, и каждый блок занимает 5~10 минут для обработки.

Поэтому я пытаюсь понять, что я делаю неправильно, или если сокращение операций настолько дорого.

SparkConf: автономный режим с использованием локального[2]. Я попытался использовать его как "spark: / / master: 7077", и он сработало, но все та же медлительность.

Код:

"filesIn" - это JavaPairRDD, где ключом является путь к файлу, а значением-содержимое файла. Итак, сначала карта, где я беру этот "файл", разбивает слова и подсчитывает их частоту (в таком случае не имеет значения, какой документ есть) А затем reduce, где я создаю HashMap (термин, freq).

JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
            String[] allWords = t._2.split(" ");

            HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
            ArrayList<String> words = new ArrayList<String>();
            ArrayList<String> terms = new ArrayList<String>();
            HashMap<String, Integer> termDF = new HashMap<String, Integer>();

            for (String term : allWords) {

                if (hashTermFreq.containsKey(term)) {
                    Double freq = hashTermFreq.get(term);
                    hashTermFreq.put(term, freq + 1);
                } else {
                    if (term.length() > 1) {
                        hashTermFreq.put(term, 1.0);
                        if (!terms.contains(term)) {
                            terms.add(term);
                        }
                        if (!words.contains(term)) {
                            words.add(term);
                            if (termDF.containsKey(term)) {
                                int value = termDF.get(term);
                                value++;
                                termDF.put(term, value);
                            } else {
                                termDF.put(term, 1);
                            }
                        }
                    }
                }
            }
            return termDF;
        }
    });

 HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
            HashMap<String, Integer> result = new HashMap<String, Integer>();

            Iterator iterator = t1.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t1.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            iterator = t2.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t2.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            return result;
        }
    });

Спасибо!

1 2

1 ответ:

Хорошо, так что просто с верхней части моей головы:

  • искровые преобразования ленивы. Это означает, что map не выполняется, пока вы не вызовете последующее reduce действие, поэтому то, что вы описываете как медленное reduce, скорее всего, медленное map + reduce
  • ArrayList.contains - Это O (N), поэтому все эти words.contains и terms.contains крайне неэффективны
  • Логика пахнет рыбой. Особенно:
    • Если термин уже был замечен, вы никогда не попадете в else ветвь
    • на первый взгляд words и terms должно иметь точно такое же содержание и должно быть эквивалентно ключам hashTermFreq или ключам termDF.
    • похоже, что значения в termDF могут принимать только значение 1. Если это то, что вы хотите, и вы игнорируете частоты, какой смысл создавать hashTermFreq?
  • Фаза, реализованная здесь, означает неэффективное линейное сканирование с растущим объектом над данными, в то время как вы действительно хотите reduceByKey.

Используя Scala в качестве псевдокода, весь ваш код может быть эффективно выражается следующим образом:

val termDF = filesIn.flatMap{
  case (_, text) => 
    text.split(" ") // Split
    .toSet // Take unique terms 
    .filter(_.size > 1) // Remove single characters
    .map(term => (term, 1))} // map to pairs
  .reduceByKey(_ + _) // Reduce by key

termDF.collectAsMap // Optionally

Наконец, похоже, что вы изобретаете колесо. По крайней мере, некоторые инструменты, которые вам нужны, уже реализованы в mllib.feature или ml.feature