$cd flink $cd bin $ start-cluster.bat Starting a local cluster with one JobManager process and one TaskManager process. You can terminate the processes via CTRL-C in the spawned shell windows. Web interface by default on http://localhost:8081/.
// the port to connect to finalint port; try { final ParameterTool params = ParameterTool.fromArgs(args); port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'"); return; }
// get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override publicvoidflatMap(String value, Collector<WordWithCount> out){ for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b){ returnnew WordWithCount(a.word, a.count + b.count); } });
// print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount"); }
// Data type for words with count publicstaticclassWordWithCount{
public String word; publiclong count;
publicWordWithCount(){}
publicWordWithCount(String word, long count){ this.word = word; this.count = count; }
@Override public String toString(){ return word + " : " + count; } }