Apache Flink三类API接口

本篇主要从Flink的三类API接口(DataStream API, DataSet API和Table API)去演示Word Count应用。希望能通过这三个示例代码帮助大家更好的理解这三类API接口。

1. DataStream API接口

DataStream API接口即是Flink流式处理接口,对无限制数据流进行过滤或聚合等转换。

1.1. 主要依赖

对于DataStream API接口主要依赖flink-streaming-java_2.11这个jar包。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>

1.2. 代码实现

首先,我们要创建一个StreamExecutionEnvironment env流式运行环境;
然后,获取要输入的数据流的源数据DataStreamSource source,这里模拟输入一段文字;
接着,对流式数据进行实时数据处理,通过表达式”\W+”拆解单个单词并构造成2-tuples(二元组元)集合;再者通过keyBy( 0 )和sum( 1 )对二元组按照下标0的元素分组和下标1的元素求和;
最后,通过print()打印出最终流式数据处理结果。
这里最重要的是所有上面构建的过程都需要通过env.execute()来开启任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountStreaming {

public static void main( String[] args ) throws Exception{

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data
DataStreamSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);

source
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// emit the pairs
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// due to type erasure, we need to specify the return type
.returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
// group by the tuple field "0"
.keyBy( 0 )
// sum up tuple on field "1"
.sum( 1 )
// print the result
.print();

// start the job
env.execute();
}

}

2. DataSet API接口

DataSet API接口即是Flink批量处理接口,可批量对数据集进行转换。

2.1. 主要依赖

对于DataSet API接口主要依赖flink-java和flink-clients_2.11这个两个jar包。

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>

2.2. 代码实现

整个代码构造过程与DataStream流式处理的过程一致,区别在于这里构造的执行环境是ExecutionEnvironment env,输入的是数据集DataSet text。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordCount {

public static void main( String[] args ) throws Exception{

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );

// emit result
counts.print();
}

}

数据处理中构造了一个LineSplitter类去实现与Flink流式处理接口相同的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );

// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
}
}
}
}

3. Table API接口

Table API接口即是Flink类似于SQL的表达语言的数据处理接口,可以嵌入批处理和流应用程序中一起使用。

3.1. 主要依赖

对于Table API接口主要依赖下列这些jar包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>

3.2. 代码实现

整个代码构造过程与前面两种处理方式的过程其实是一致的,不同点在于这中间需要实现BatchTableEnvironment表操作环境,而处理完的数据(即拆分后单词)会存储在创建的”words”表中,其字段名称叫做”word”。
最后通过SQL语句的方式从”word”表中查询汇总单词数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;


public class WordCountTable {
public static void main( String[] args ) throws Exception{

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
final BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, TableConfig.getDefault());

// get input data
DataSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);

// split the sentences into words
FlatMapOperator<String, String> dataset = source
.flatMap( ( String value, Collector<String> out ) -> {
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
} )
// with lambdas, we need to tell flink what type to expect
.returns(String.class);

// create a table named "words" from the dataset
tableEnv.registerDataSet( "words", dataset, "word" );

// word count using an sql query
Table results = tableEnv.sqlQuery( "select word, count(*) from words group by word" );
tableEnv.toDataSet( results, Row.class ).print();
}
}

4. 代码执行

使用flink命令行工具(在flink安装的bin文件夹中)启动程序:

1
flink run -c your.package.WordCount target/your-jar.jar

-c选项允许您指定要运行的类。如果jar是可执行的/定义了主类,则没有必要。
运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

References:

  1. Getting Started with Apache-Flink.