package customPartition0103 package customPartition0103 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object getLogCount extends App { //获取spark的sc var conf = new SparkConf().setAppName("Logcount").setMaster("local") var sc = new SparkContext(conf) // 读取文件: private val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt") val rdd1 = linerdd.map(line=>{ // 拿两个引号之间的数据 val index1 = line.indexOf("\"") val index22 = line.lastIndexOf("\"") val line1 = line.substring(index1+1,index22) //获取两个空格之间的数据 val index3 = line1.indexOf(" ") val index4 = line1.lastIndexOf(" ") val line2 = line1.substring(index3+1,index4) //获取jsp的名字 val name = line2.substring(line2.indexOf("/")+1) //之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想) (name,1) }) //聚合 private val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_+_) private val result: RDD[(String, Int)] = rdd2.sortBy(_._2,false) //打印 result.foreach(println) sc.stop() }
自定义分区:
package customPartition0103 import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable /** * 自定义分区 * 按照网页的名字来分区 * */ object selfPartition { def main(args: Array[String]): Unit = { //获取spark的sc var conf = new SparkConf().setAppName("Logcount").setMaster("local") // var conf = new SparkConf().setAppName("Logcount").setMaster("spark://bigdata166:7077") var sc = new SparkContext(conf) // 读取文件: val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt") // private val linerdd= sc.textFile("/testdata/localhost_access_log.txt") //测试失败,可能亚打jar包 val rdd1 = linerdd.map(line=>{ // 拿两个引号之间的数据 val index1 = line.indexOf("\"") val index22 = line.lastIndexOf("\"") val line1 = line.substring(index1+1,index22) //获取两个空格之间的数据 val index3 = line1.indexOf(" ") val index4 = line1.lastIndexOf(" ") val line2 = line1.substring(index3+1,index4) //获取jsp的名字 val name = line2.substring(line2.indexOf("/")+1) //之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想) (name,1) }) //数据转换rdd转换为数组:jsp名字的数组 val jspList = rdd1.map(_._1).distinct().collect() //自定义分区:分区规则,需要一个新的类 var myPartition = new MyPartition(jspList) var rdd3 = rdd1.partitionBy(myPartition) //输出 rdd3.saveAsTextFile("D:\\testdata\\streaming\\TomcatLogPartition") sc.stop()} } class MyPartition(jspList:Array[String]) extends Partitioner{ //定义一个保存分区的条件的集合 val partitionMap = new mutable.HashMap[String,Int]() //分区号 var indexId = 0 for(jsp <- jspList){ partitionMap.put(jsp,indexId) indexId+=1 } //返回分区个数 override def numPartitions: Int = partitionMap.size //根据名字,返回对应的分区 override def getPartition(key: Any): Int = { partitionMap.getOrElse(key.toString,0) } }
数据库写入:
package customPartition0103 import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.{SparkConf, SparkContext} /** * 将数据写进MySQL中 * 先去MySQL中创建表 * */ object writeTomysql{ var conn: Connection = null var ppsm:PreparedStatement = null def main(args: Array[String]): Unit = { //获取spark的环境 //获取spark的sc var conf = new SparkConf().setAppName("count").setMaster("local") var sc = new SparkContext(conf) //解析数据 //1.读取文件 var linerdd = sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt") //2.解析日志:网页名称 /** * 192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713 * 网页名称:MyDemoWeb/head.jsp * */ var rdd1 = linerdd.map(line =>{ //1.或两个引号之间的数据 var index1 = line.indexOf("\"") var index2 = line.lastIndexOf("\"") var line1 = line.substring(index1+1,index2)// GET /MyDemoWeb/head.jsp HTTP/1.1 //2.获取两个空格之间的数据 var index3 = line1.indexOf(" ") var index4 = line1.lastIndexOf(" ") var line2 = line1.substring(index3+1,index4)// /MyDemoWeb/head.jsp //3.获取jsp的名字 var name = line2.substring(line2.indexOf("/")+1) (name,1) } ) //3.聚合 var rdd2 = rdd1.reduceByKey(_+_) //4.排序,访问量降序 var result = rdd2.sortBy(_._2,false) //创建MySQL链接, //问题:一条数据链接一个MySQL,对MySQL的压力会比较大 /*result.foreach( t =>{ //1.获取MySQL链接 conn = DriverManager.getConnection("jdbc:mysql://192.168.2.111:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000") ppsm = conn.prepareStatement("insert into mydata1 values(?,?)") //往MySQL写数据 ppsm.setString(1,t._1) ppsm.setInt(2,t._2) ppsm.executeUpdate() } )*/ //通过分区来避免 result.foreachPartition(myconn) sc.stop() } def myconn(t:Iterator[(String,Int)]){ //先获取链接 try{ conn = DriverManager.getConnection("jdbc:mysql://192.168.43.166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000") ppsm = conn.prepareStatement("insert into logcount0103 values(?,?)") t.foreach( it=>{ ppsm.setString(1,it._1) ppsm.setInt(2,it._2) ppsm.executeUpdate() } ) }catch { case t:Throwable => t.printStackTrace() }finally { if(ppsm != null) ppsm.close() if(conn != null) conn.close() } } }