scala 日志统计代码 自定义分区 数据库连接

2022年6月16日20:51:05 发表评论 784 views

 

package customPartition0103

package customPartition0103

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object getLogCount extends App {
  //获取spark的sc
  var conf = new SparkConf().setAppName("Logcount").setMaster("local")
  var sc = new SparkContext(conf)
  // 读取文件:
  private val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")

  val rdd1 = linerdd.map(line=>{
    // 拿两个引号之间的数据
    val index1 = line.indexOf("\"")
    val index22 = line.lastIndexOf("\"")
    val line1 = line.substring(index1+1,index22)

    //获取两个空格之间的数据
    val index3 = line1.indexOf(" ")
    val index4 = line1.lastIndexOf(" ")
    val line2 = line1.substring(index3+1,index4)

    //获取jsp的名字
    val name = line2.substring(line2.indexOf("/")+1)
    //之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想)
    (name,1)

  })
  //聚合
  private val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_+_)
  private val result: RDD[(String, Int)] = rdd2.sortBy(_._2,false)
  //打印
  result.foreach(println)
  sc.stop()
}

 

 自定义分区:

 

package customPartition0103

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
 * 自定义分区
 * 按照网页的名字来分区
 * */
object selfPartition {
  def main(args: Array[String]): Unit = {
  //获取spark的sc
  var conf = new SparkConf().setAppName("Logcount").setMaster("local")
//  var conf = new SparkConf().setAppName("Logcount").setMaster("spark://bigdata166:7077")
  var sc = new SparkContext(conf)
  // 读取文件:
  val linerdd= sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")
//  private val linerdd= sc.textFile("/testdata/localhost_access_log.txt") //测试失败,可能亚打jar包

  val rdd1 = linerdd.map(line=>{
    // 拿两个引号之间的数据
    val index1 = line.indexOf("\"")
    val index22 = line.lastIndexOf("\"")
    val line1 = line.substring(index1+1,index22)

    //获取两个空格之间的数据
    val index3 = line1.indexOf(" ")
    val index4 = line1.lastIndexOf(" ")
    val line2 = line1.substring(index3+1,index4)

    //获取jsp的名字
    val name = line2.substring(line2.indexOf("/")+1)
    //之所以要用这种方法,因为比split更加节约内存也提高速度(算法思想)
    (name,1)

  })
  //数据转换rdd转换为数组:jsp名字的数组
  val jspList = rdd1.map(_._1).distinct().collect()
  //自定义分区:分区规则,需要一个新的类
  var myPartition = new MyPartition(jspList)

  var rdd3 = rdd1.partitionBy(myPartition)
  //输出
  rdd3.saveAsTextFile("D:\\testdata\\streaming\\TomcatLogPartition")
  sc.stop()}
}



class MyPartition(jspList:Array[String]) extends Partitioner{
  //定义一个保存分区的条件的集合
  val partitionMap = new mutable.HashMap[String,Int]()
  //分区号
  var indexId = 0

  for(jsp <- jspList){
    partitionMap.put(jsp,indexId)
    indexId+=1
  }
  //返回分区个数
  override def numPartitions: Int = partitionMap.size
  //根据名字,返回对应的分区
  override def getPartition(key: Any): Int = {
    partitionMap.getOrElse(key.toString,0)
  }
}

 

数据库写入:
 

package customPartition0103

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 将数据写进MySQL中
 * 先去MySQL中创建表
 * */
object writeTomysql{
  var conn: Connection = null
  var ppsm:PreparedStatement = null

  def main(args: Array[String]): Unit = {
    //获取spark的环境
    //获取spark的sc
    var conf = new SparkConf().setAppName("count").setMaster("local")
    var sc = new SparkContext(conf)

    //解析数据
    //1.读取文件
    var linerdd = sc.textFile("C:\\Users\\xabcd\\Desktop\\localhost_access_log.txt")

    //2.解析日志:网页名称
    /**
     * 192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
     * 网页名称:MyDemoWeb/head.jsp
     * */
    var rdd1 = linerdd.map(line =>{
      //1.或两个引号之间的数据
      var index1 = line.indexOf("\"")
      var index2 = line.lastIndexOf("\"")
      var line1 = line.substring(index1+1,index2)//  GET /MyDemoWeb/head.jsp HTTP/1.1

      //2.获取两个空格之间的数据
      var index3 = line1.indexOf(" ")
      var index4 = line1.lastIndexOf(" ")
      var line2 = line1.substring(index3+1,index4)// /MyDemoWeb/head.jsp
      //3.获取jsp的名字
      var name = line2.substring(line2.indexOf("/")+1)

      (name,1)

    }
    )

    //3.聚合

    var rdd2 = rdd1.reduceByKey(_+_)
    //4.排序,访问量降序
    var result = rdd2.sortBy(_._2,false)
    //创建MySQL链接,
    //问题:一条数据链接一个MySQL,对MySQL的压力会比较大
    /*result.foreach(
      t =>{
        //1.获取MySQL链接
        conn = DriverManager.getConnection("jdbc:mysql://192.168.2.111:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000")
        ppsm = conn.prepareStatement("insert into mydata1 values(?,?)")
        //往MySQL写数据
        ppsm.setString(1,t._1)
        ppsm.setInt(2,t._2)
        ppsm.executeUpdate()
      }
    )*/

    //通过分区来避免
    result.foreachPartition(myconn)
    sc.stop()
  }


  def myconn(t:Iterator[(String,Int)]){
    //先获取链接
    try{
      conn = DriverManager.getConnection("jdbc:mysql://192.168.43.166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","000000")
      ppsm = conn.prepareStatement("insert into logcount0103 values(?,?)")
      t.foreach(
        it=>{
          ppsm.setString(1,it._1)
          ppsm.setInt(2,it._2)
          ppsm.executeUpdate()
        }
      )
    }catch {
      case t:Throwable => t.printStackTrace()
    }finally {
      if(ppsm != null) ppsm.close()
      if(conn != null) conn.close()
    }

  }
}

 

 

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: