当先锋百科网

首页 1 2 3 4 5 6 7

sum

在对datastream keyby后使用sum函数聚合

package com.stanley.wordcount

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

/**
  * Created by admin on 2020/7/2.
  */
object SumWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //并行度设置为1
    env.setParallelism(1)
    //读取文本流数据
    val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
    val outputDataStream:DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    outputDataStream.print("sum_wordcount")
    env.execute("wc test")
  }
}

processfunction

调用最底层processfunction,将count保存成一个keyedstate

package com.stanley.wordcount

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
/**
  * Created by admin on 2020/7/2.
  */
object ProcessWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
    val outputDataStraem:DataStream[(String,Int)] = inputDataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      //调用新建的MyProcessFunction
      .process(new MyProcessFunction)
    outputDataStraem.print("process_wordcount")
    env.execute("wc test")
  }
}

class MyProcessFunction extends KeyedProcessFunction[Tuple,(String,Int),(String,Int)]{
  //创建一个countState
  private var countState:ValueState[Int] = _
  override def open(parameters: Configuration): Unit = {
    //初始化countState
    countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int]))
  }

  override def processElement(i: (String, Int), context: KeyedProcessFunction[Tuple, (String, Int), (String, Int)]#Context, collector: Collector[(String, Int)]): Unit = {
    //取出count
    var count = countState.value()
    count+=1
    //更新countState
    countState.update(count)
    collector.collect((i._1,count))
  }
}

RichMapFunction

RichMapFunction和ProcessFunction一样都是实现了AbstractRichFunction,所以同样拥有生命周期方法和运行时上下文,以及keyed state

package com.stanley.wordcount

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

/**
  * Created by admin on 2020/7/2.
  */
object RichWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
    val outputDataStraem:DataStream[(String,Int)] = inputDataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      //调用MyRichMapFunction
      .map(new MyRichMapFunction)
    outputDataStraem.print("rich_wordcount")
    env.execute("wc test")
  }
}

class MyRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
  private var countState:ValueState[Int] = _

  override def open(parameters: Configuration): Unit = {
    countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int]))
  }

  override def map(in: (String,Int)): (String, Int) = {
    var count = countState.value()
    count+=1
    countState.update(count)
    (in._1,count)
  }
}

总结

sum方法适合在比较简单的逻辑的计算中使用,ProcessFunction和RichMapFunction在实际应用环境中可以通过将状态保存到状态后端,如果出现故障通过checkpoint来恢复。