博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
14-Flink-Table-&-SQL实战
阅读量:5891 次
发布时间:2019-06-19

本文共 5044 字,大约阅读时间需要 16 分钟。

简介

Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

Flink SQL的编程模型

创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要负责:   1、在内部目录中注册一个Table   2、注册一个外部目录   3、执行SQL查询   4、注册一个用户自定义函数(标量、表及聚合)   5、将DataStream或者DataSet转换成Table   6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用 一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。 TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。

在目录中注册表

TableEnvironment允许通过各种源来注册一个表:

  1、一个已存在的Table对象,通常是Table API或者SQL查询的结果 Table projTable = tableEnv.scan("X").select(...);

  2、TableSource,可以访问外部数据如文件、数据库或者消息系统 TableSource csvSource = new CsvTableSource("/path/to/file", ...);

  3、DataStream或者DataSet程序中的DataStream或者DataSet //将DataSet转换为Table Table table= tableEnv.fromDataSet(tableset);

注册TableSink

注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],......):   

TableSink csvSink = new CsvTableSink("/path/to/file", ...);   复制代码
  2、 String[] fieldNames = {
"a", "b", "c"}; TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);复制代码

实战案例一

基于Flink SQL的WordCount:

public class WordCountSQL {    public static void main(String[] args) throws Exception{        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        List list  =  new ArrayList();        String wordsStr = "Hello Flink Hello TOM";        String[] words = wordsStr.split("\\W+");        for(String word : words){            WC wc = new WC(word, 1);            list.add(wc);        }        DataSet
input = env.fromCollection(list); tEnv.registerDataSet("WordCount", input, "word, frequency"); Table table = tEnv.sqlQuery( "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet
result = tEnv.toDataSet(table, WC.class); result.print(); }//main public static class WC { public String word;//hello public long frequency;//1 // public constructor to make it a Flink POJO public WC() {} public WC(String word, long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return "WC " + word + " " + frequency; } }}复制代码

输出如下:

WC TOM 1WC Hello 2WC Flink 1复制代码

实战案例二

本例稍微复杂,首先读取一个文件中的内容进行统计,并写入到另外一个文件中:

public class SQLTest {	public static void main(String[] args) throws Exception{		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();		BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);		env.setParallelism(1);		DataSource
input = env.readTextFile("test.txt"); input.print(); //转换成dataset DataSet
topInput = input.map(new MapFunction
() { @Override public Orders map(String s) throws Exception { String[] splits = s.split(" "); return new Orders(Integer.valueOf(splits[0]), String.valueOf(splits[1]),String.valueOf(splits[2]), Double.valueOf(splits[3])); } }); //将DataSet转换为Table Table order = tableEnv.fromDataSet(topInput); //orders表名 tableEnv.registerTable("Orders",order); Table tapiResult = tableEnv.scan("Orders").select("name"); tapiResult.printSchema(); Table sqlQuery = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc"); //转换回dataset DataSet
result = tableEnv.toDataSet(sqlQuery, Result.class); //将dataset map成tuple输出 /*result.map(new MapFunction
>() { @Override public Tuple2
map(Result result) throws Exception { String name = result.name; Double total = result.total; return Tuple2.of(name,total); } }).print();*/ TableSink sink = new CsvTableSink("SQLTEST.txt", "|"); //writeToSink /*sqlQuery.writeToSink(sink); env.execute();*/ String[] fieldNames = { "name", "total"}; TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE}; tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink); sqlQuery.insertInto("SQLTEST"); env.execute(); } /** * 源数据的映射类 */ public static class Orders { /** * 序号,姓名,书名,价格 */ public Integer id; public String name; public String book; public Double price; public Orders() { super(); } public Orders(Integer id, String name, String book, Double price) { this.id = id; this.name = name; this.book = book; this.price = price; } } /** * 统计结果对应的类 */ public static class Result { public String name; public Double total; public Result() {} } }//复制代码

所有代码,我放在了我的公众号,回复Flink可以下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

转载地址:http://tafsx.baihongyu.com/

你可能感兴趣的文章
vs2008快捷键极其技巧 转载
查看>>
window 7上安装Visual Studio 2017失败的解决方法
查看>>
JavaScript 正整数正则表达式
查看>>
单元测试之Stub和Mock
查看>>
【转】Java泛型-类型擦除
查看>>
PredictionIO+Universal Recommender快速开发部署推荐引擎的问题总结(2)
查看>>
【232】◀▶ IDL显示地理图像
查看>>
【116】Windows 系统组合键
查看>>
学习进度表 04
查看>>
python---__getattr__\__setattr_重载'.'操作
查看>>
谈谈javascript中的prototype与继承
查看>>
时序约束优先级_Vivado工程经验与各种时序约束技巧分享
查看>>
nginx win 启动关闭_windows下nginx启动与关闭的批处理脚本
查看>>
minio 并发数_MinIO 参数解析与限制
查看>>
eap wifi 证书_用openssl为EAP-TLS生成证书(CA证书,服务器证书,用户证书)
查看>>
mysql 应用程序是哪个文件夹_Mysql 数据库文件存储在哪个目录?
查看>>
mysql半同步和无损复制_MySQL半同步复制你可能没有注意的点
查看>>
mysql能看见表显示表不存在_遇到mysql数据表不存在的问题
查看>>
使用mysql实现宿舍管理_JSP+Struts2+JDBC+Mysql实现的校园宿舍管理系统
查看>>
mysql alter 修改字段类型_MySQL ALTER命令:删除,添加或修改表字段、修改字段类型及名称等...
查看>>