Искра-уменьшить операцию, занимающую слишком много времени
Я делаю приложение с Spark, которое будет запускать некоторые алгоритмы извлечения темы. Для этого сначала мне нужно сделать некоторую предварительную обработку, извлекая матрицу документа-термина к концу. Я мог бы это сделать, но для большой коллекции документов (всего 2 тысячи 5 Мб) этот процесс занимает целую вечность.
Итак, отлаживая, я нашел, где программа как бы застревает,и это в операции сокращения. То, что я делаю в этой части кода, - это подсчитываю, сколько раз каждый термин встречается на коллекции, поэтому сначала я сделал "карту", отрисовав ее для каждого rdd, а затем" уменьшил " ее, сохранив результат внутри хэш-карты. Операция map выполняется очень быстро, но в reduce операция разбивается на 40 блоков, и каждый блок занимает 5~10 минут для обработки.
Поэтому я пытаюсь понять, что я делаю неправильно, или если сокращение операций настолько дорого.SparkConf: автономный режим с использованием локального[2]. Я попытался использовать его как "spark: / / master: 7077", и он сработало, но все та же медлительность.
Код:
"filesIn" - это JavaPairRDD, где ключом является путь к файлу, а значением-содержимое файла. Итак, сначала карта, где я беру этот "файл", разбивает слова и подсчитывает их частоту (в таком случае не имеет значения, какой документ есть) А затем reduce, где я создаю HashMap (термин, freq).
JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
String[] allWords = t._2.split(" ");
HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
ArrayList<String> words = new ArrayList<String>();
ArrayList<String> terms = new ArrayList<String>();
HashMap<String, Integer> termDF = new HashMap<String, Integer>();
for (String term : allWords) {
if (hashTermFreq.containsKey(term)) {
Double freq = hashTermFreq.get(term);
hashTermFreq.put(term, freq + 1);
} else {
if (term.length() > 1) {
hashTermFreq.put(term, 1.0);
if (!terms.contains(term)) {
terms.add(term);
}
if (!words.contains(term)) {
words.add(term);
if (termDF.containsKey(term)) {
int value = termDF.get(term);
value++;
termDF.put(term, value);
} else {
termDF.put(term, 1);
}
}
}
}
}
return termDF;
}
});
HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
HashMap<String, Integer> result = new HashMap<String, Integer>();
Iterator iterator = t1.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t1.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
iterator = t2.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t2.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
return result;
}
});
Спасибо!
1 ответ:
Хорошо, так что просто с верхней части моей головы:
- искровые преобразования ленивы. Это означает, что
map
не выполняется, пока вы не вызовете последующееreduce
действие, поэтому то, что вы описываете как медленноеreduce
, скорее всего, медленноеmap
+reduce
ArrayList.contains
- Это O (N), поэтому все этиwords.contains
иterms.contains
крайне неэффективны- Логика пахнет рыбой. Особенно:
- Если термин уже был замечен, вы никогда не попадете в
else
ветвь- на первый взгляд
words
иterms
должно иметь точно такое же содержание и должно быть эквивалентно ключамhashTermFreq
или ключамtermDF
.- похоже, что значения в
termDF
могут принимать только значение 1. Если это то, что вы хотите, и вы игнорируете частоты, какой смысл создаватьhashTermFreq
?- Фаза, реализованная здесь, означает неэффективное линейное сканирование с растущим объектом над данными, в то время как вы действительно хотите
reduceByKey
.Используя Scala в качестве псевдокода, весь ваш код может быть эффективно выражается следующим образом:
val termDF = filesIn.flatMap{ case (_, text) => text.split(" ") // Split .toSet // Take unique terms .filter(_.size > 1) // Remove single characters .map(term => (term, 1))} // map to pairs .reduceByKey(_ + _) // Reduce by key termDF.collectAsMap // Optionally
Наконец, похоже, что вы изобретаете колесо. По крайней мере, некоторые инструменты, которые вам нужны, уже реализованы в
mllib.feature
илиml.feature