Flink入门训练–以New York City Taxi为例

最近在学Flink,准备用Flink搭建一个实时的推荐系统。找到一个好的网站(也算作是flink创始者的官方网站),上面有关于Flink的上手教程,用来练练手,熟悉熟悉,下文仅仅是我的笔记。

1. 数据集

网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-1015年关于出租车驾驶的公共数据集。

具体数据下载方法,可见# Taxi Data Streams,下载完数据后,不要解压缩。

我们的第一个数据集包含纽约市的出租车出行的信息,每一次出行包含两个事件:START和END,可以分别理解为开始和结束该行程。每一个事件又包括11个属性,详细介绍如下:

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
startTime      : DateTime  // the start time of a ride
endTime        : DateTime  // the end time of a ride,
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location
startLat       : Float     // the latitude of the ride start location
endLon         : Float     // the longitude of the ride end location
endLat         : Float     // the latitude of the ride end location
passengerCnt   : Short     // number of passengers on the ride

另一个数据集包含出租车的费用信息,与每一次行程对应:

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
startTime      : DateTime  // the start time of a ride
paymentType    : String    // CSH or CRD
tip            : Float     // tip(小费) for this ride
tolls          : Float     // tolls for this ride
totalFare      : Float     // total fare collected

2. 生成数据流

首先定义TaxiRide事件,即数据集中的每一个record。

我们使用Flink的source函数(TaxiRideSource)读取TaxiRide流,这个source是基于事件时间进行的。同样的,费用事件TaxiFare的流通过函数TaxiFareSource进行传送。为了让生成的流更加真实,事件传送的时间是与timestamp成比例的。两个真实相隔十分钟发生的事件在流中也相差十分钟。此外,我们可以定义一个变量speed-up factor为60,该变量为加速因子,那么真实事件中的一分钟在流中只有1秒钟,缩短60倍嘛。不仅如此,我们还可以定义最大服务延时,这个延时使得每个事件在最大服务延时之内随机出现,这么做的目的是让这个流的事件产生与在real-world发生的不确定性更接近。

对于这个应用,我们设置speed-up factor为600(即10分钟相当于1秒),以及最大延时时间为60。

所有的行动都应使用事件时间(event time)(相对于处理时间(processing time))来实现。

Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.

事件时间(event time)将程序语义与服务速度分离开,即使在历史数据或无序传送的数据的情况下也能保证一致的结果。简单来说就是,在数据处理的过程中,依赖的时间跟在流中出现的时间无关,只跟该事件发生的时间有关。

private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {  
  
  // 设置服务开始时间servingStartTime  
  long servingStartTime = Calendar.getInstance().getTimeInMillis();  
  
  // 数据开始时间dataStartTime,即第一个ride的timestamp  
  long dataStartTime;  
  
  Random rand = new Random(7452);  
  
  // 使用优先队列进行emit,其比较方式为他们的等待时间  
  PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(  
         32,  
             new Comparator<Tuple2<Long, Object>>() {  
              @Override  
              public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {  
                       return o1.f0.compareTo(o2.f0); }  
              });  
  
  // 读取第一个ride,并将第一个ride插入到schedule里  
  String line;  
  TaxiRide ride;  
  if (reader.ready() && (line = reader.readLine()) != null) {  
  // read first ride  
  ride = TaxiRide.fromString(line);  
  // extract starting timestamp  
  dataStartTime = getEventTime(ride);  
  // get delayed time,这个delayedtime是dataStartTime加一个随机数,随机数有最大范围,用来模拟真实世界情况  
  long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);  
  
  // 将ride插入到schedule里  
  emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  // 设置水印时间  
  long watermarkTime = dataStartTime + watermarkDelayMSecs;  
  // 下一个水印时间是时间戳是 watermarkTime - maxDelayMsecs - 1  
  // 只能证明,这个时间一定是小于dataStartTime的  Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
  // 将该水印放入Schedule,且这个水印被优先队列移到了ride之前  
  emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
  
  } else {  
      return;  
  }  
  
  // 从文件里读取下一个ride(peek)  
  if (reader.ready() && (line = reader.readLine()) != null) {  
      ride = TaxiRide.fromString(line);  
  }  
  
  // read rides one-by-one and emit a random ride from the buffer each time  
  while (emitSchedule.size() > 0 || reader.ready()) {  
  
      // insert all events into schedule that might be emitted next  
      // 在Schedule里的下一个事件的延时后时间  long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;  
      // 当前从文件读取的ride的事件时间  
      long rideEventTime = ride != null ? getEventTime(ride) : -1;  
      // 这个while循环用来进行当前Schedule为空的情况  
      while(  
            ride != null && ( // while there is a ride AND  
            emitSchedule.isEmpty() || // and no ride in schedule OR  
            rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule  
            )  
      {  
          // insert event into emit schedule  
          long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);  
          emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  
          // read next ride  
          if (reader.ready() && (line = reader.readLine()) != null) {  
                ride = TaxiRide.fromString(line);  
                rideEventTime = getEventTime(ride);  
          }  
          else {  
                ride = null;  
                rideEventTime = -1;  
          }  
      }  
  
      // 提取Schedule里的第一个ride,叫做head  
      Tuple2<Long, Object> head = emitSchedule.poll();  
      // head应该要到达的时间  
      long delayedEventTime = head.f0;  
      long now = Calendar.getInstance().getTimeInMillis();  
  
      // servingTime = servingStartTime + (delayedEventTime - dataStartTime)/ this.servingSpeed  
      long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);  
      // 应该再等多久,才让这个ride发生呢?(哈哈,我好喜欢这个描述)  
      long waitTime = servingTime - now;  
      // 既然要等,那就睡着等吧  
      Thread.sleep( (waitTime > 0) ? waitTime : 0);  
      // 如果这个head是一个TaxiRide  
      if(head.f1 instanceof TaxiRide) {  
             TaxiRide emitRide = (TaxiRide)head.f1;  
              // emit ride  
             sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));  
       }  
      // 如果这个head是一个水印标志  
      else if(head.f1 instanceof Watermark) {  
             Watermark emitWatermark = (Watermark)head.f1;  
             // emit watermark  
             sourceContext.emitWatermark(emitWatermark);  
             // 并设置下一个水印标志到Schedule中  
             long watermarkTime = delayedEventTime + watermarkDelayMSecs;  
             // 同样,保证这个水印的时间戳在下一个ride的timestamp之前  
             Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
             emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
      }  
   }  
}

那么,如何在java中运行这些sources,下面是一个示例:

// get an ExecutionEnvironment
StreamExecutionEnvironment env = StreamExcutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
    new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));

另外,有一些应用需要我们使用加入检查点的机制。检查点(checkpoint)是从failure中恢复的一种机制。他也需要建立CheckpointedTaxiRideSource来在流中运行。

3. 数据清洗🛀

3.1 数据连接🔗

由于我们的应用要研究的是在纽约市内的出租车情况,所以我们要排除掉纽约市外的地点。通过这个过滤器:

private static class NYCFilter implements FilterFunction<TaxiRide> {  
  @Override  
  public boolean filter(TaxiRide taxiRide) throws Exception {  
      return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&  
             GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);  
  }  
}

执行过滤器:

// start the data generator  
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));  
  
DataStream<TaxiRide> filteredRides = rides  
  // filter out rides that do not start or stop in NYC  
  .filter(new NYCFilter());

现在我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。这里介绍几个用到的Transformation functions:

  • FlatMap: 输入1个record,输出为0或1或更多个records的映射
  • Filter:进行评估,如果结果为Ture,则传输record
  • KeyBy:用来将记录按照第一个元素(一个字符串)进行分组,根据该key将数据进行重新分区,然后将记录再发送给下一个算子

由于我们没办法控制ride和fare到达的先后,所以我们储存先到的信息直到和他匹配的信息到来。这就需要用到有状态的计算

public class RidesAndFaresExercise extends ExerciseBase {  
   public static void main(String[] args) throws Exception {  
  
  ParameterTool params = ParameterTool.fromArgs(args);  
  final String ridesFile = params.get("rides", pathToRideData);  
  final String faresFile = params.get("fares", pathToFareData);  
 
  final int delay = 60; // at most 60 seconds of delay  
  final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second  
 
  // set up streaming execution environment  
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
  env.setParallelism(ExerciseBase.parallelism);  
  
  DataStream<TaxiRide> rides = env  
            .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))  
            .filter((TaxiRide ride) -> ride.isStart)  
            .keyBy("rideId");  
  
  DataStream<TaxiFare> fares = env  
            .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))  
            .keyBy("rideId");  
  
  DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides  
            .connect(fares)  
            .flatMap(new EnrichmentFunction());  
  
  printOrTest(enrichedRides);  
  
  env.execute("Join Rides with Fares (java RichCoFlatMap)");  
  }  
  
   public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {  
  
   // keyed, managed state  
   private ValueState<TaxiRide> rideState;  
   private ValueState<TaxiFare> fareState;  
  
   @Override  
   public void open(Configuration config) throws Exception {  
         rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));  
         fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));  
  }  
  
   @Override  
   public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiFare fare = fareState.value();  
         if (fare != null) {  
            fareState.clear();  
            out.collect(new Tuple2(ride, fare));  
          } else {  
            rideState.update(ride);  
          }  
      }  
  
    @Override  
    public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiRide ride = rideState.value();  
         if (ride != null){  
            rideState.clear();  
            out.collect(new Tuple2(ride, fare));  
          } else {  
            fareState.update(fare);  
          }  
      }  
   }  
}

运行,可以看到,生成的数据是这样的,ride和fare结合到了一起:

3> (196965,START,2013-01-01 11:54:08,1970-01-01 00:00:00,-73.99048,40.75611,-73.98388,40.767143,2,2013007021,2013014447,196965,2013007021,2013014447,2013-01-01 11:54:08,CSH,0.0,0.0,6.5)
1> (197311,START,2013-01-01 11:55:44,1970-01-01 00:00:00,-73.98894,40.72127,-73.95267,40.771126,1,2013008802,2013012009,197311,2013008802,2013012009,2013-01-01 11:55:44,CRD,2.7,0.0,16.2)
2> (196608,START,2013-01-01 11:53:00,1970-01-01 00:00:00,-73.97817,40.761055,-73.98574,40.75613,2,2013004060,2013014162,196608,2013004060,2013014162,2013-01-01 11:53:00,CSH,0.0,0.0,5.5)

3.2 状态缓存清理

那么现在,我们想要上面的两者结合操作更加的Robust。对于现实中的数据,有时某些record会丢失,这意味着我们可能只收到TaxiRide and TaxiFare中的一个,另一个永远不会到。所以先到的那个record会一直占用着内存。为了解决这个问题,我们尝试在CoProcessFunction中清理掉没有被匹配的状态。

这个功能定义在类 ExpiringStateExercise中:

首先给出missing data的输入,这里我们丢掉所有ride的END事件,START事件每隔1000个丢失一个。😯

DataStream<TaxiRide> rides = env  
      .addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)))  
      .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0)))  
      .keyBy(ride -> ride.rideId);

SingleOutputStreamOperator processed = rides  
      .connect(fares)  
      // Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream.  
      // The function will be called for every element in the input streams and can produce zero or more output elements.  
      .process(new EnrichmentFunction());  

我们使用CoprocessingFunction来进行上面描述的操作。对于有两个inputs的流来说,下面的描述生动形象的介绍了我们需要override的3个方法:

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

processElement1(...) & processElement2(...) 用于两个数据流的call。onTimer()用于设定抛弃掉没有寻到匹配的record的动作。

@Override  
// Called when a timer set using TimerService fires.  
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   if (fareState.value() != null) {  
      ctx.output(unmatchedFares, fareState.value());  
      fareState.clear();  
  }  
   if (rideState.value() != null) {  
      ctx.output(unmatchedRides, rideState.value());  
      rideState.clear();  
  }  
}  
  
@Override  
// A Context that allows querying the timestamp of the element,  
// querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time.  
// The context is only valid during the invocation of this method, do not store it.  
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   // 当前处理事件是ride,且当前状态中fare为非空, 则输出。  
   // (由于ride在之前已经被keyby()过,这里只会传送跟fare相同rideId的ride)  TaxiFare fare = fareState.value();  
   if (fare != null) {  
       fareState.clear();  
       out.collect(new Tuple2(ride, fare));  
  } else { // 否则,更新rideState  
       rideState.update(ride);  
       // 只要水印到达,我们就停止等待相应的fare  
       // Registers a timer to be fired when the event time watermark passes the given time.  
       context.timerService().registerEventTimeTimer(ride.getEventTime());  
  }  
}

输出结果如下,可以看到输出的内容的时间戳都相差1000,跟之前定义的一致。

1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3
3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5
4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38

3.3 窗口

现在,我们想确定每小时获得最多小费(tip)的驾驶员(每一条fare的record里有小费这一栏)。 最简单的方法是分两步:首先使用一小时长的时间窗口(time window)来计算每小时内每个驾驶员的总提示,然后从该窗口流的结果中找到每小时获得最多小费的驾驶员。

我们在下列code中会遇到以下几个问题:

AggregareFunction: 这个函数有一个将输入元素加到accumulator的方法。首先,这个函数接口有一个初始化accumulator的方法,并且可以将两个accumulators融合成一个,不仅如此还可以从accumulator中提取出output。

ProcessWindowFunction:这个函数输入一个包含窗口的所有元素的可迭代的集合以及一个包含time和state的Context object,这些输入能够使他提供更加丰富的功能。

public class HourlyTipsExercise extends ExerciseBase {  
  
   public static void main(String[] args) throws Exception {  
  
   // read parameters  
   ParameterTool params = ParameterTool.fromArgs(args);  
   final String input = params.get("input", ExerciseBase.pathToFareData);  
  
   final int maxEventDelay = 60; // events are out of order by max 60 seconds  
   final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second  
  
   // set up streaming execution environment  
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
   env.setParallelism(ExerciseBase.parallelism);  
  
   // start the data generator  
   DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));  
  
   // compute tips per hour for each driver  
   DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares  
            // 根据driveId 进行分组  
            .keyBy((TaxiFare fare) -> fare.driverId)  
            // 设置窗口时间为1小时  
            .timeWindow(Time.hours(1))  
            // AddTips()为aggFunction, WrapWithWindowInfo()为windowFunction  
            .aggregate(new AddTips(), new WrapWithWindowInfo());  
  
   // find the highest total tips in each hour  
   DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips  
            .timeWindowAll(Time.hours(1))  
            .maxBy(2);  
  
   printOrTest(hourlyMax);  
  
   // execute the transformation pipeline  
   env.execute("Hourly Tips (java)");  
  }  
    
 /* Adds up the tips. */  
 public static class AddTips implements AggregateFunction<  
            TaxiFare, // input type  
            Float, // accumulator type  
            Float     // output type  
            >         
   {  
      @Override  
      public Float createAccumulator() {  
             return 0F;  
      }  
  
      @Override  
      public Float add(TaxiFare fare, Float aFloat) {  
             return fare.tip + aFloat;  
      }  
  
      @Override  
      public Float getResult(Float aFloat) {  
             return aFloat;  
      }  
  
      @Override  
      public Float merge(Float aFloat, Float accumulator) {  
             return aFloat + accumulator;  
      }  
   }  
  
   /*  
 * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */  
 public static class WrapWithWindowInfo extends ProcessWindowFunction<  
            Float, Tuple3<Long, Long, Float>, Long, TimeWindow> {  
      @Override  
      public void process(Long key, Context context, Iterable<Float> elements,  Collector<Tuple3<Long, Long, Float>> out) throws Exception {  
      Float sumOfTips = elements.iterator().next();  
      out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));  
      }  
   }  
}

以下是输出结果:

1> (1357002000000,2013000493,54.45)
2> (1357005600000,2013010467,64.53)
3> (1357009200000,2013010589,104.75)

4. Broadcast State

广播变量(Broadcast State):这种机制用来支持数据从需要上游任务广播传送到下游任务的事件。

这篇文章对广播变量讲的很详细:# A Practical Guide to Broadcast State in Apache Flink

在这个机制中,我们将系统分为actions stream和pattern stream。actions stream即为正常的数据流,也就是例子中 rides。pattern为我们广播的数据流,这里可以理解为我们的监听室需要对rides进行监听,即我们传输一个pattern到broadcast state中,然后operator打印出action stream中符合这个pattern的数据。

在这里,我们的pattern是一个interger n,代表分钟数。我们想要打印的是在我们传送这个pattern的时刻,所有已经开始了n分钟且还没有结束的rides。

接下来是他的应用代码:

首先,在这个简单的例子中,我们需要一个广播变量描述符,但是并不用他储存东西。

final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(  
      "dummy",  
  BasicTypeInfo.LONG_TYPE_INFO,  
  BasicTypeInfo.LONG_TYPE_INFO  
);

然后,设置一个socket接口,用来接收pattern:

BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)  
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())  
      .broadcast(dummyBroadcastState);

当我们得到按照rideId分组后的rides stream以及从socket返回的分钟n的broadcast stream后,我们连接这两个streams。然后将它传送到QueryFunction()处理。QueryFunction将pattern(也就是socket返回的分钟数n)与ride进行匹配,最后返回被匹配的rides。

DataStream<TaxiRide> reports = rides  
      .keyBy((TaxiRide ride) -> ride.taxiId)  
      .connect(queryStream)  
      .process(new QueryFunction());

public static class QueryFunction extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {  
    private ValueStateDescriptor<TaxiRide> taxiDescriptor =  new ValueStateDescriptor<>("saved ride", TaxiRide.class);  
    private ValueState<TaxiRide> taxiState;  
  
  @Override  
  public void open(Configuration config) {  
      // 得到每一个taxi的上一个事件的状态 
      taxiState = getRuntimeContext().getState(taxiDescriptor);  
  }  
  
  @Override  
  public void processElement(TaxiRide ride, ReadOnlyContext ctx, Collector< TaxiRide> out) throws Exception {  
     // For every taxi, let's store the most up-to-date information.  
     // TaxiRide implements Comparable to make this easy.  TaxiRide savedRide = taxiState.value();  
     if (ride.compareTo(savedRide) > 0) {  
         taxiState.update(ride);  
      }  
   }  
  
  @Override  
  public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) throws Exception {  
      DateTimeFormatter timeFormatter =  
            DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();  
  
      Long thresholdInMinutes = Long.valueOf(msg);  
      Long wm = ctx.currentWatermark();  
      System.out.println("QUERY: " + thresholdInMinutes + " minutes at " + timeFormatter.print(wm));  
  
      // Collect to the output all ongoing rides that started at least thresholdInMinutes ago.  
      ctx.applyToKeyedState(taxiDescriptor, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {  
         @Override  
         public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {  
             TaxiRide ride = taxiState.value();  
             if (ride.isStart) {  
                 long minutes = (wm - ride.getEventTime()) / 60000;  
                 if (ride.isStart && (minutes >= thresholdInMinutes)) {  
                     out.collect(ride);  
                  }  
             }  
         }  
      });  
   }  
}

Reference:

  1. data Artisans
  2. 《Flink基础教程》
  3. 《Learning Apache Flink》