1.处理HDFS日志文件中错误日志
val lines = sc.textFile("hdfs://...") //lines is a org.apache.spark.rdd.MappedRDD
val errors = lines.filter(_.startsWith("ERROR")) //errors is a org.apache.spark.rdd.FilterRDD
errors.cache() //persist到内存中
errors.count() //触发action,计算errors有多少个,即ERROR的多少行
errors.filter(_.contains("MySQL")).count()
errors.filter(_.contains("HDFS")).map(_.split('\t')()).collect()
2. SQL RDDRelation
package org.apache.spark.examples.sql
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
case class Record(key:Int, value:String)
object RDDRelation{
def main(args: Array[String]){
val sparkConf = new SparkConf().setAppName("RDDRelation")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( to ).map(i => Record(i, s"val_$i"))).toDF()
df.register
}
}