Как работать с DataSet в Spark с помощью scala?
Я загружаю свой CSV с помощью DataFrame, а затем преобразую его в DataSet, но он показывает вот так
Несколько маркеров в этой строке:
- Невозможно найти кодер для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта
искра.скрытый._ Поддержка сериализации других типов будет добавлена в будущих выпусках.
- недостаточно аргументов для метода as: (неявное доказательство$2:
орг..апаш.искра.язык SQL.Кодировщик [DataSet.искра.aacsv]) орг.апаш.искра.язык SQL.Dataset [Набор Данных.искра.aacsv]. Неопределенное значение параметра evidence$2
Как решить эту проблему?. Мой код -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
Теперь я изменил основную функцию следующим образом -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
Но он выдает ошибку - исключение в потоке "main" org.апаш.искра.язык SQL.AnalysisException: не удается разрешить' Adj_Close
' заданные входные столбцы: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; строка 1 pos 7. Что же мне делать ?
Теперь Я выполните мой метод с использованием на основе заданного интервала времени с помощью Spark scheduler. Но я ссылаюсь на эту ссылку - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application Пожалуйста, помогите нам.
2 ответа:
Есть ли у вас заголовки (имена столбцов) в ваших csv-файлах ? Если да, попробуйте добавить
.option("header","true")
в инструкции read. Пример:sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV]
.В приведенном ниже блоге приведены различные примеры фреймов данных и наборов данных: http://technippet.blogspot.in/2016/10/different-ways-of-creating.html
Попробуйте добавить следующий импорт, прежде чем конвертировать
DF
вDS
.sc.implicits._
Или
sqlContext.implicits._
Подробнее о работе с набором данных https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets