Как конвертировать JavaPairRDD в HashMap


У меня есть JavaPairDStream, содержащий пару ключ-значение. Мне нужно преобразовать его в хэш-карту.Я попытался сделать то же самое с обычным JavaPairRDD, вызвав функцию "collectAsMap()" на нем и его работе, но когда я пытаюсь сделать то же самое на DStream, это не удается.

Я пытаюсь достичь того же путем преобразования "JavaPairDStream" в "JavaPairRDD" с помощью функции "foreachRDD", а затем после этого я использую функцию "collectAsMap()" на JavaPairRDD.

Map<String,String> value= new HashMap<String,String>();
            value=line.collectAsMap();

//Here "line" is a "JavaPairRDD<String,String>".

Это так не дают никакой ошибки компиляции, но когда я запускаю программу, то она терпит неудачу и выдает ошибку, как показано ниже.

java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tuple2;
    at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:447)
    at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:464)
    at attempt1.CSV_Spark$3.call(CSV_Spark.java:109)
    at attempt1.CSV_Spark$3.call(CSV_Spark.java:1)
Я не уверен, что мой метод верен или нет. Есть ли разница между обычным " JavaPairRDD "и тем, который создается функцией" foreachRDD"? Почему тот же самый метод работает на обычном "JavaPairRDD", но терпит неудачу, когда я применяю его на "JavaPairRDD", созданном путем применения функции "foreachRDD" на JavaPairDStream. Если я где-нибудь ошибусь, пожалуйста, дайте мне знать. Также если есть есть ли какой-либо другой способ, то, пожалуйста, разместите его здесь. Спасибо.
2 8

2 ответа:

Во время компиляции вниз приведение принимается как Карта и Хэшмап находятся в одном и том же наследовании. Хотя мы не получаем никаких ошибок во время компиляции, мы получим ClassCastException во время выполнения. Чтобы избежать этой проблемы, вы можете попробовать следующее:

Код:

JavaPairRDD<K, V> javaRDDPair  = rddInstance.mapToPair(new PairFunction<T, K, V>() {
   @Override
    public Tuple2<K, V> call(final T value) {
    // statements 
    // operations on value
    return new Tuple2<K, V>(KTypeValue, VTypeValue);
    }
    });

    Map<K,V> map =  javaRDDPair.collectAsMap();
    HashMap<K,V> hmap = new HashMap<K,V>(map);

Примечание: rddInstance является объектом типаJavaRDD .

Предположим, что у нас есть JavaRDD , который содержит T введите в него значения. после преобразования на нем мы создаем JavaPairRDD , который содержит K, V> пары . Теперь требование заключается в том, что преобразования JavaPairRDD к HashMap объект для дальнейших вычислений в приложении. Используйте методcollectAsMap и назначьте его результат самому объектуMap . После этого вы можете создать HashMap, передав Map пример .

Вы можете попробовать

JavaPairDStream stream =... 
JavaPairRDD pairRdd=stream.compute(validTime);

, что эквивалентно своего рода букетированию вокруг допустимых моментов времени типа Time, когда вы рассуждаете о потоке.

Или, используя forEachRDD, затем оберните

JavaPairRDD<K,V> wrapRDD(RDD<scala.Tuple2<K,V>> rdd)

Оттуда collectAsMap.

java.util.Map<K,V>  collectAsMap()