当先锋百科网

首页 1 2 3 4 5 6 7

1.处理HDFS日志文件中错误日志

val lines = sc.textFile("hdfs://...")  //lines is a org.apache.spark.rdd.MappedRDD
val errors = lines.filter(_.startsWith("ERROR")) //errors is a org.apache.spark.rdd.FilterRDD
errors.cache() //persist到内存中
errors.count()  //触发action,计算errors有多少个,即ERROR的多少行

errors.filter(_.contains("MySQL")).count()
errors.filter(_.contains("HDFS")).map(_.split('\t')()).collect()

2. SQL RDDRelation

package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

case class Record(key:Int, value:String)

object RDDRelation{
    def main(args: Array[String]){
        val sparkConf = new SparkConf().setAppName("RDDRelation")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = sc.parallelize( to ).map(i => Record(i, s"val_$i"))).toDF()

df.register
    }
}