package customPartition0103
package customPartition0103
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object getLogCount extends App {
//获取spark的sc
var conf = new SparkConf().setAppName("Logcount").setMaster("local")
var sc = new SparkContext(conf)
// 读取文件:
private val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")
val rdd1 = linerdd.map(line=>{
// 拿两个引号之间的数据
val index1 = line.indexOf("\"")
val index22 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index22)
//获取两个空格之间的数据
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)
//获取jsp的名字
val name = line2.substring(line2.indexOf("/")+1)
//之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想)
(name,1)
})
//聚合
private val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_+_)
private val result: RDD[(String, Int)] = rdd2.sortBy(_._2,false)
//打印
result.foreach(println)
sc.stop()
}
自定义分区:
package customPartition0103
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 自定义分区
* 按照网页的名字来分区
* */
object selfPartition {
def main(args: Array[String]): Unit = {
//获取spark的sc
var conf = new SparkConf().setAppName("Logcount").setMaster("local")
// var conf = new SparkConf().setAppName("Logcount").setMaster("spark://bigdata166:7077")
var sc = new SparkContext(conf)
// 读取文件:
val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")
// private val linerdd= sc.textFile("/testdata/localhost_access_log.txt") //测试失败,可能亚打jar包
val rdd1 = linerdd.map(line=>{
// 拿两个引号之间的数据
val index1 = line.indexOf("\"")
val index22 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index22)
//获取两个空格之间的数据
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)
//获取jsp的名字
val name = line2.substring(line2.indexOf("/")+1)
//之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想)
(name,1)
})
//数据转换rdd转换为数组:jsp名字的数组
val jspList = rdd1.map(_._1).distinct().collect()
//自定义分区:分区规则,需要一个新的类
var myPartition = new MyPartition(jspList)
var rdd3 = rdd1.partitionBy(myPartition)
//输出
rdd3.saveAsTextFile("D:\\testdata\\streaming\\TomcatLogPartition")
sc.stop()}
}
class MyPartition(jspList:Array[String]) extends Partitioner{
//定义一个保存分区的条件的集合
val partitionMap = new mutable.HashMap[String,Int]()
//分区号
var indexId = 0
for(jsp <- jspList){
partitionMap.put(jsp,indexId)
indexId+=1
}
//返回分区个数
override def numPartitions: Int = partitionMap.size
//根据名字,返回对应的分区
override def getPartition(key: Any): Int = {
partitionMap.getOrElse(key.toString,0)
}
}
数据库写入:
package customPartition0103
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将数据写进MySQL中
* 先去MySQL中创建表
* */
object writeTomysql{
var conn: Connection = null
var ppsm:PreparedStatement = null
def main(args: Array[String]): Unit = {
//获取spark的环境
//获取spark的sc
var conf = new SparkConf().setAppName("count").setMaster("local")
var sc = new SparkContext(conf)
//解析数据
//1.读取文件
var linerdd = sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")
//2.解析日志:网页名称
/**
* 192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
* 网页名称:MyDemoWeb/head.jsp
* */
var rdd1 = linerdd.map(line =>{
//1.或两个引号之间的数据
var index1 = line.indexOf("\"")
var index2 = line.lastIndexOf("\"")
var line1 = line.substring(index1+1,index2)// GET /MyDemoWeb/head.jsp HTTP/1.1
//2.获取两个空格之间的数据
var index3 = line1.indexOf(" ")
var index4 = line1.lastIndexOf(" ")
var line2 = line1.substring(index3+1,index4)// /MyDemoWeb/head.jsp
//3.获取jsp的名字
var name = line2.substring(line2.indexOf("/")+1)
(name,1)
}
)
//3.聚合
var rdd2 = rdd1.reduceByKey(_+_)
//4.排序,访问量降序
var result = rdd2.sortBy(_._2,false)
//创建MySQL链接,
//问题:一条数据链接一个MySQL,对MySQL的压力会比较大
/*result.foreach(
t =>{
//1.获取MySQL链接
conn = DriverManager.getConnection("jdbc:mysql://192.168.2.111:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000")
ppsm = conn.prepareStatement("insert into mydata1 values(?,?)")
//往MySQL写数据
ppsm.setString(1,t._1)
ppsm.setInt(2,t._2)
ppsm.executeUpdate()
}
)*/
//通过分区来避免
result.foreachPartition(myconn)
sc.stop()
}
def myconn(t:Iterator[(String,Int)]){
//先获取链接
try{
conn = DriverManager.getConnection("jdbc:mysql://192.168.43.166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000")
ppsm = conn.prepareStatement("insert into logcount0103 values(?,?)")
t.foreach(
it=>{
ppsm.setString(1,it._1)
ppsm.setInt(2,it._2)
ppsm.executeUpdate()
}
)
}catch {
case t:Throwable => t.printStackTrace()
}finally {
if(ppsm != null) ppsm.close()
if(conn != null) conn.close()
}
}
}
