page contents

java flink使用详细教程

本文讲述了关于Java flink使用详细教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-03-SEyJwf58640a94c6dbc09.jpg

本文讲述了关于Java flink使用详细教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

首先介绍Flink DataSet API实现统计单词频次程序,然后简要看下用于实时流式数据处理的DataStream API。

maven依赖

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-java</artifactId>

    <version>1.2.0</version>

</dependency>

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-test-utils_2.10</artifactId>

    <version>1.2.0</version>

    <scope>test<scope>

</dependency>

核心API概念

使用Flink时,选哟知道一些API相关的概念:

每个在分布式集合数据执行转换程序,需要使用多个转换数据函数,包括:filtering, mapping, joining, grouping, and aggregating。

Flink中sink操作触发流执行产生程序期望的结果,例如,将结果保存到文件系统或打印到标准输出。

Flink转换是懒执行,意味着知道sink操作执行才会真正执行。

Flink API支持两种模式——批处理和实时处理。对于有限数据源使用批模式,使用DataSet API;处理无界实时流数据,应该DataStream API。

DataSet API转换数据

Flink程序的入口点是ExecutionEnvironment 类的实例, 它定义了程序执行的上下文。下面创建ExecutionEnvironment对下并开始处理数据:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

注:当你在本地机器上启动程序,则仅在本地JVM上执行处理。如果需要在集群环境中启动处理,则应该在集群中每个服务器上按照Apache Flink并配置相应ExecutionEnvironment。

创建数据集(DataSet)

要执行数据转换,需要提供数据。下面使用ExecutionEnvironement创建DataSet class :

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

也可以从其他数据源创建数据集,如Apache Kafka、CSV文件或其他数据源。

过滤和归约

准备好数据集,就可以进行过滤和转换。假设我们需要根据某阈值进行过滤,然后对过滤后的数据进行累加。则可以使用 filter() 和 reduce() 函数实现:

int threshold = 30;

List<Integer> collect = amounts

  .filter(a -> a > threshold)

  .reduce((integer, t1) -> integer + t1)

  .collect();

assertThat(collect.get(0)).isEqualTo(90);

注:collect()方法是sink操作,它实际触发数据转换。

map映射

假设我们有Person对象数据集:

private static class Person {

    private int age;

    private String name;

    // standard constructors/getters/setters

}

接着创建该对象的数据集:

DataSet<Person> personDataSource = env.fromCollection(

  Arrays.asList(

    new Person(23, "Tom"),

    new Person(75, "Michael")));

如果我们仅需要每个对象的age属性,可以使用map转换方法实现:

List<Integer> ages = personDataSource

  .map(p -> p.age)

  .collect();

assertThat(ages).hasSize(2);

assertThat(ages).contains(23, 75);

join方法

可以对两个数据集基于ID字段进行关联操作,实现连接转换。下面创建用户的事务和地址数据集:

Tuple3<Integer, String, String> address

  = new Tuple3<>(1, "5th Avenue", "London");

DataSet<Tuple3<Integer, String, String>> addresses

  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 

  = new Tuple2<>(1, "Transaction_1");

DataSet<Tuple2<Integer, String>> transactions 

  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

两个元组的第一个字段都是整型,这是连接两个数据集的ID字段。为了执行实际连接逻辑,需要实现地址和事务数据集的KeySelector接口:

private static class IdKeySelectorTransaction 

  implements KeySelector<Tuple2<Integer, String>, Integer> {

    @Override

    public Integer getKey(Tuple2<Integer, String> value) {

        return value.f0;

    }

}

private static class IdKeySelectorAddress 

  implements KeySelector<Tuple3<Integer, String, String>, Integer> {

    @Override

    public Integer getKey(Tuple3<Integer, String, String> value) {

        return value.f0;

    }

}

每个选择器只返回应该执行联接的字段。不幸的是不能使用lambda表达式简化实现,应该Flink需要泛型类型信息。

接着使用选择器实现合并逻辑:

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>

  joined = transactions.join(addresses)

  .where(new IdKeySelectorTransaction())

  .equalTo(new IdKeySelectorAddress())

  .collect();

assertThat(joined).hasSize(1);

assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

排序

首先准备一些实例数据,Tuple2类型集合:

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");

Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");

Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");

Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");

DataSet<Tuple2<Integer, String>> transactions = env.fromElements(

  fourthPerson, secondPerson, thirdPerson, firstPerson);

如何需要按Tuple2中第一个字段进行排序,需要使用sortPartition方法执行转换:

List<Tuple2<Integer, String>> sorted = transactions

  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)

  .collect();

assertThat(sorted)

  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

经典示例

单词计数是现实大数据处理框架的经典示例,主要对数据文本的内容处理计算单词频数。本节提供Flink实现版本。首先创建LineSplitter 类分割输入为单词,收集每个单词的Tuple2类型(key-value), key即输入中发现的每个单词,value为常数1。

该类实现FlatMapFunction接口,它接收字符串作为输入,产生 Tuple2<String, Integer>作为输出:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

        Stream.of(value.toLowerCase().split("\\W+"))

          .filter(t -> t.length() > 0)

          .forEach(token -> out.collect(new Tuple2<>(token, 1)));

    }

}

然后调用Collector类的collect方法,推送数据至处理流水线。接着按第一个元素(单词)对元组进行分组并执行sum聚集方法对元组的第二个元素进行求和计算单词的频数。

public static DataSet<Tuple2<String, Integer>> startWordCount(

  ExecutionEnvironment env, List<String> lines) throws Exception {

    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())

      .groupBy(0)

      .aggregate(Aggregations.SUM, 1);

}

我们使用了三种Flink转换类型:flatMap(), groupBy() 和 aggregate()。下面写完整测试是否与期望一致:

List<String> lines = Arrays.asList(

  "This is a first sentence",

  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();

assertThat(collect).containsExactlyInAnyOrder(

  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),

  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),

  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

DataStream API 转换数据

创建DataStream

Apache Flink 通过DataStream API支持事件流处理。首先需要使用StreamExecutionEnvironment 类消费事件:

StreamExecutionEnvironment executionEnvironment

 = StreamExecutionEnvironment.getExecutionEnvironment();

接着使用executionEnvironment从不同来源创建事件流,它可以是消息总线,如Apache Kafka,但我们简单创建一组字符串元素:

DataStream<String> dataStream = executionEnvironment.fromElements(

  "This is a first sentence", 

  "This is a second sentence with a one word");

和DataSet类一样,可以对DataStream中的元素应用转换:

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

为了触发执行,需要执行sink操作,如print()方法,把转换结果打印至控制台,接着执行StreamExecutionEnvironment 类的execute方法:

upperCase.print();

env.execute();

程序会产生下面输出结果:

1> THIS IS A FIRST SENTENCE

2> THIS IS A SECOND SENTENCE WITH A ONE WORD

窗口事件

当实时处理事件流时,可能需要把一些事件分为组,基于这些事件窗口进行计算。

假设事件流中每个事件发送至我们系统中,其中包括事件量和时间戳。我们可以容许事件无序到达,但前提是它们的延迟不超过20秒。对于这种场景首先创建一个流来模拟两个相隔几分钟的事件,并定义一个时间戳提取器来指定延迟阈值:

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed

  = env.fromElements(

  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),

  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))

  .assignTimestampsAndWatermarks(

    new BoundedOutOfOrdernessTimestampExtractor

      <Tuple2<Integer, Long>>(Time.seconds(20)) {

 

        @Override

        public long extractTimestamp(Tuple2<Integer, Long> element) {

          return element.f1 * 1000;

        }

    });

接下来定义一个窗口操作,将事件分组到5秒的窗口中,并对这些事件应用转换:

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

  .maxBy(0, true);

reduced.print();

它将获得每5秒窗口的最后一个元素,因此它输出:

1> (15,1491221519)

请注意,我们没有看到第二个事件,因为它的到达时间晚于指定的延迟阈值。

总结

本文简要介绍了Apache Flink框架,并通过示例展示如何使用一些转换API,包括利用DataSet API实现单词频次计算,利用DataStream API 实现简单实时事件流转换。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-03-10 10:24
  • 阅读 ( 701 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1312 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章