Spark Streaming 与 Kinesis的集成

1:概念

Amazon Kinesis

  Amazon Kinesis是一种全面管理的服务,用于大规模实时处理流数据。提供多种核心功能,可以经济高效地处理任意规模的流数据,同时具有很高的灵活性。借助 Amazon Kinesis,可以获取实时数据 (例如视频、音频、应用程序日志、网站点击流) 以及关于机器学习、分析和其他应用程序的 IoT 遥测数据。借助 Amazon Kinesis,可以即刻对收到的数据进行处理和分析并做出响应,无需等到收集完全部数据后才开始进行处理。

Shard (分区)

  分区是 Amazon Kinesis 数据流的基本吞吐量单位。一个分片提供 1MB/秒数据输入和 2MB/秒输入输出容量。一个分片最高可支持每秒 1 000 个 PUT 记录。创建数据流时,必须指定所需的分区数量。

Record(记录)

  记录是存储在 Amazon Kinesis 数据流中的数据单元。记录由序列号、分区键和数据 Blob 组成。数据 Blob 是数据创建器添加到数据流的重要数据。数据块的最大尺寸(Base64 编码前的数据有效载荷)是 1 兆字节 (MB)。

Partition Key(分区键)

  分区键用于隔离Records并路由到不同的数据流Shards。分区键由数据创建器在添加数据到 Amazon Kinesis 数据流时指定。

Sequence Number(序列号)

  序列号是每个Record的唯一标识符

 

2:Kinesis Data Stream的限制

  1. kinesis默认情况下数据量的Record在添加后的最长24小时内进行访问。也可以启动延长数据保留期限来将该限制提升到7天。

  2. Record内的数据块最大是1MB

  3. 每个Shard 最高可支持每秒1000个Put输入记录。也就是说每个Shard的最大写带宽为1Gb/s

  

  本次实验是以Spark官网给出的例子来实现。有兴趣可以看英文原文:https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 这次实现会创建一个名叫word-counts-kinesis的Kinesis Data Stream Shard 数量为1。

 

3:创建Kinesis stream

 

 

 

4: 实现一个往kinesis写数据的Producer

代码实现如下:

import java.nio.ByteBuffer

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.log4j.{Level, Logger}

import scala.util.Random

object KinesisWordProducerASL {

  def main(args: Array[String]): Unit = {
    //调整日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //Kinesis Stream 名称
    val stream = "word-counts-kinesis"
    //Kinesis 访问路径
    val endpoint = "https://kinesis.us-east-1.amazonaws.com"
    //一秒钟发送1000个Records
    val recordsPerSecond = "1000"
    //一个Record包含100个单词
    val wordsPerRecord = "10"

    val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)

    println("Totals for the words send")
    totals.foreach(println(_))
  }

  private def generate(stream: String,
                       endpoint: String,
                       recordsPerSecond: Int,
                       wordsPerRecord: Int): Seq[(String, Int)] = {
    //定义一个单词列表
    val randomWords = List("spark", "hadoop", "hive", "kinesis", "kinesis")
    val totals = scala.collection.mutable.Map[String, Int]()

    //建立Kinesis连接 这里aws_access_key_id,aws_secret_access_key已经存在本地credentials
    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
    kinesisClient.setEndpoint(endpoint)

    println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
      s" $recordsPerSecond records per second and $wordsPerRecord words per record")

    //根据recordsPerSecond 和 wordsPerRecord 将随机生成的单词放入Record
    for (i <- 1 to 2) {
      val records =(1 to recordsPerSecond.toInt).foreach {
        recordNum =>
          val data = (1 to wordsPerRecord.toInt).map(x => {
            val randomWordIdx = Random.nextInt(randomWords.size)
            val randomWord = randomWords(randomWordIdx)

            totals(randomWord) = totals.getOrElse(randomWord, 0) + 1

            randomWord
          }).mkString(" ")

          //创建一个分区键
          val partitionKey = s"partitionKey-$recordNum"
          //创建一个putRecordRequest
          val putRecordRequest = new PutRecordRequest().withStreamName(stream)
            .withPartitionKey(partitionKey)
            .withData(ByteBuffer.wrap(data.getBytes))
          //将record放到stream中
          val putRecordResult = kinesisClient.putRecord(putRecordRequest)
      }

      Thread.sleep(1000)
      println("Sent " + recordsPerSecond + " records")
    }

    totals.toSeq.sortBy(_._1)
  }
}

  运行结果

Note: 如果是在本地运行是需要AWS credentials 文件

 

5:实现Spark Streaming往Kinesis读数据

Spark读数据的频率为2秒钟从Kinesis Data Stream读一次

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object KinesisWordCountASL {
  def main(args: Array[String]): Unit = {
    //调整日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val appName = "WordCountsApp"
    //Kinesis Stream 名称
    val streamName = "word-counts-kinesis"
    val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"

    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials !=null, "No AWS credentials found. Please specify credentials using one of the methods specified " +
      "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription.getShards().size()

    val numStreams = numShards

    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval
    val regionName = getRegionNameByEndpoint(endpointUrl)

    val sparkConf = new SparkConf().setAppName("KinesisWordCountASL").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .build()
    }

    //Union all the streams
    val unionStreams = ssc.union(kinesisStreams)

    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))

    val wordCounts =words.map(word => (word, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def getRegionNameByEndpoint(endpoint: String): String = {
    import scala.collection.JavaConverters._
    val uri = new java.net.URI(endpoint)
    RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
      .asScala
      .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
      .map(_.getName)
      .getOrElse(
        throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
  }
}

  运行结果

6:架构思路

 7:总结

Spark Streaming + Kinesis 易用性高,上手容易。可以很快速的搭建一个大数据爬虫网站。前端开启成千上万个爬虫往Kinesis里面写数据。后端用Spark Streaming 分发,过滤,分析数据。

源码路径: https://github.com/mayflower-zc/spark-kinesis-sample-project