IDEA通过DS+Row查询数据
package day0106
import java.util.Properties
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* 数据写入MySQL
* */
object Demo3 extends App {
//获取spark环境
val spark = SparkSession.builder().appName("DF").master("local").getOrCreate()
//读取数据
var stuRDD = spark.sparkContext.textFile("D:\\testdata\\student.txt").map(_.split("\t"))
//通过Struct Type创建Schema
val schema = StructType(
List(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType)
)
)
//将数据映射到Row,创建DF
val RowRDD = stuRDD.map(s => Row(s(0).toInt,s(1),s(2).toInt))
val stuDF = spark.createDataFrame(RowRDD,schema)
//注册表
stuDF.createOrReplaceTempView("student")
//sql操作
val result = spark.sql("select * from student order by age desc")
//写入MySQL中
val myPro = new Properties()
myPro.setProperty("user","root")
myPro.setProperty("password","000000")
myPro.setProperty("driver","com.mysql.jdbc.Driver")
result.write.mode("overwrite").jdbc("jdbc:mysql://bigdata166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","student",myPro)
spark.stop()
}
IDEA通过case class 创建DF
IDEA数据写入mysql
package day0106
import java.util.Properties
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* 数据写入MySQL
* */
object Demo3 extends App {
//获取spark环境
val spark = SparkSession.builder().appName("DF").master("local").getOrCreate()
//读取数据
var stuRDD = spark.sparkContext.textFile("D:\\testdata\\student.txt").map(_.split("\t"))
//通过Struct Type创建Schema
val schema = StructType(
List(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType)
)
)
//将数据映射到Row,创建DF
val RowRDD = stuRDD.map(s => Row(s(0).toInt,s(1),s(2).toInt))
val stuDF = spark.createDataFrame(RowRDD,schema)
//注册表
stuDF.createOrReplaceTempView("student")
//sql操作
val result = spark.sql("select * from student order by age desc")
//写入MySQL中
val myPro = new Properties()
myPro.setProperty("user","root")
myPro.setProperty("password","000000")
myPro.setProperty("driver","com.mysql.jdbc.Driver")
result.write.mode("overwrite").jdbc("jdbc:mysql://bigdata166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","student",myPro)
spark.stop()
}
