Как работать с DataSet в Spark с помощью scala?


Я загружаю свой CSV с помощью DataFrame, а затем преобразую его в DataSet, но он показывает вот так

Несколько маркеров в этой строке:
- Невозможно найти кодер для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта
искра.скрытый._ Поддержка сериализации других типов будет добавлена в будущих выпусках.
- недостаточно аргументов для метода as: (неявное доказательство$2:
орг..апаш.искра.язык SQL.Кодировщик [DataSet.искра.aacsv]) орг.апаш.искра.язык SQL.Dataset [Набор Данных.искра.aacsv]. Неопределенное значение параметра evidence$2

Как решить эту проблему?. Мой код -

case class aaCSV(
    a: String, 
    b: String 
    )

object WorkShop {

  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val customSchema = StructType(Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true)))

    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv") 
    df.printSchema()
    df.show()
    val googleDS = df.as[aaCSV]
    googleDS.show()

  }

}

Теперь я изменил основную функцию следующим образом -

def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
   val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
    sa.printSchema()
    sa.show()
}

Но он выдает ошибку - исключение в потоке "main" org.апаш.искра.язык SQL.AnalysisException: не удается разрешить' Adj_Close ' заданные входные столбцы: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; строка 1 pos 7. Что же мне делать ?

Теперь Я выполните мой метод с использованием на основе заданного интервала времени с помощью Spark scheduler. Но я ссылаюсь на эту ссылку - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application Пожалуйста, помогите нам.

2 2

2 ответа:

Есть ли у вас заголовки (имена столбцов) в ваших csv-файлах ? Если да, попробуйте добавить .option("header","true") в инструкции read. Пример: sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV].

В приведенном ниже блоге приведены различные примеры фреймов данных и наборов данных: http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

Попробуйте добавить следующий импорт, прежде чем конвертировать DF в DS.

sc.implicits._

Или

sqlContext.implicits._

Подробнее о работе с набором данных https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets