// set up the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data DataStreamSource<String> source = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles" );
source // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> { // emit the pairs for( String token : value.toLowerCase().split( "\\W+" ) ){ if( token.length() > 0 ){ out.collect( new Tuple2<>( token, 1 ) ); } } } ) // due to type erasure, we need to specify the return type .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) ) // group by the tuple field "0" .keyBy( 0 ) // sum up tuple on field "1" .sum( 1 ) // print the result .print();
// set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data // you can also use env.readTextFile(...) to get words DataSet<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," );
DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap( new LineSplitter() ) // group by the tuple field "0" and sum up tuple field "1" .groupBy( 0 ) .aggregate( Aggregations.SUM, 1 );
publicvoidflatMap( String value, Collector<Tuple2<String, Integer>> out ){ // normalize and split the line into words String[] tokens = value.toLowerCase().split( "\\W+" );
// set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); final BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, TableConfig.getDefault());
// get input data DataSource<String> source = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles" );
// split the sentences into words FlatMapOperator<String, String> dataset = source .flatMap( ( String value, Collector<String> out ) -> { for( String token : value.toLowerCase().split( "\\W+" ) ){ if( token.length() > 0 ){ out.collect( token ); } } } ) // with lambdas, we need to tell flink what type to expect .returns(String.class);
// create a table named "words" from the dataset tableEnv.registerDataSet( "words", dataset, "word" );
// word count using an sql query Table results = tableEnv.sqlQuery( "select word, count(*) from words group by word" ); tableEnv.toDataSet( results, Row.class ).print(); } }
4. 代码执行
使用flink命令行工具(在flink安装的bin文件夹中)启动程序:
1
flink run -c your.package.WordCount target/your-jar.jar