百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Flink教程-flink 1.11 使用sql将流式数据写入文件系统

wptr33 2024-12-23 14:05 29 浏览

  • 滚动策略
  • 分区提交
  • 完整示例
  • flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法写入数据,支持的写入格式包括json、csv、avro、parquet、orc。

    一个最简单的DDL如下:

    CREATE TABLE fs_table ( 
     user_id STRING, 
     order_amount DOUBLE, 
     dt STRING, 
     h string, 
     m string   
    ) PARTITIONED BY (dt,h,m) WITH ( 
       'connector'='filesystem', 
       'path'='file:///tmp/abc', 
       'format'='orc' 
     );

    下面我们简单的介绍一下相关的概念和如何使用。

    滚动策略

    Key Default Type Description sink.rolling-policy.file-size 128MB MemorySize 分区文件的最大值,超过这个大小,将会启动一个新文件。 sink.rolling-policy.rollover-interval 30 m Duration 分区文件滚动的最大时间间隔,超过这个时间,将会新启动一个文件 sink.rolling-policy.check-interval 1 m Duration 一个时间间隔,定期去检查上面那个配置指定的策略下,文件是否应该滚动生成新文件.

    • 在写入列格式(比如parquet、orc)的时候,上述的配置和checkpoint的间隔一起来控制滚动策略,也就是说sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval、checkpoint间隔,这三个选项,只要有一个条件达到了,然后就会触发分区文件的滚动,结束上一个文件的写入,生成新文件。
    • 对于写入行格式的数据,比如json、csv,主要是靠sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval,也就是文件的大小和时间来控制写入数据的滚动策略.

    分区提交

    在往一个分区写完了数据之后,我们希望做一些工作来通知下游。比如在分区目录写一个SUCCESS文件,或者是对于hive来说,去更新metastore的数据,自动刷新一下分区等等。 分区的提交主要依赖于触发器和提交的策略:

    • 触发器:即什么时候触发分区的提交,
    • 提交策略:也就是分区写完之后我们做什么,目前系统提供了两种内置策略:1.往分区目录写一个空SUCCESS文件;2.更新元数据.

    分区提交触发器

    key default type 解释 sink.partition-commit.trigger process-time String 触发器的类型,目前系统提供了两种:process-time 和 partition-time,如果选择了process-time,则当系统时间大于processtime的时候触发提交,如果选择了partition-time,则需要先从分区字段里面抽取分区时间的开始时间,然后当水印大于这个分区时间的时候触发分区的提交. sink.partition-commit.delay 0 s Duration 提交分区的延迟时间

    1. process-time. 这种提交方式依赖于系统的时间,一旦遇到数据延迟等情况,会造成分区和分区的数据不一致。
    2. partition-time :这种情况需要从分区字段里抽取出来相应的pattern,具体可参考下一个段落分区的抽取。
    3. sink.partition-commit.delay:一旦这个数值设置不为0,则在process-time情况下,当系统时间大于分区创建时间加上delay延迟,会触发分区提交; 如果是在partition-time 情况下,则需要水印大于分区创建时间加上delay时间,会触发分区提交.



    第一个参数process-time、partition-time,我们不用做过多的解释,就类似于flink中的processtime和eventtime。

    第二个参数sink.partition-commit.delay我们用实际案例解释下: 比如我们配置的是分区是/yyyy-MM-dd/HH/,写入的是ORC列格式,checkpoint配置的间隔是一分钟,也就是默认情况下会每分钟生成一个orc文件,最终会在每个分区(/yyyy-MM-dd/HH/)下面生成60个orc文件。

    比如当前系统正在写入/day=2020-07-06/h=10/分区的数据,那么这个分区的创建时间是2020-07-06 10:00:00,如果这个delay配置采用的是默认值,也就是0s,这个时候当写完了一个ORC文件,也就是2020-07-06 10:01:00分钟的时候,就会触发分区提交,比如更新hive的元数据,这个时候我们去查询hive就能查到刚刚写入的文件;如果我们想/day=2020-07-06/h=10/这个分区的60个文件都写完了再更新分区,那么我们可以将这个delay设置成 1h,也就是等到2020-07-06 11:00:00的时候才会触发分区提交,我们才会看到/2020-07-06/10/分区下面的所有数据

    分区时间的抽取

    从分区值里抽取分区时间,我们可以理解为上面触发器参数配置为partition-time的时候,分区的创建时间,当水印大于这个时间+delay的时候触发分区的提交.

    Key Default Type 解释 partition.time-extractor.kind default String 抽取分区的方式,目前有default和custom两种,如果是default,需要配置partition.time-extractor.timestamp-pattern,如果是custom,需要配置自定义class partition.time-extractor.class null String 自定义class partition.time-extractor.timestamp-pattern null String 从分区值中抽取时间戳的模式,需要组织成yyyy-MM-dd HH:mm:ss格式,比如 对于上面我们提到的分区/yyyy-MM-dd/HH/,其中两个分区字段对应的字段名分为是dt和hour,那么我们这个timestamp-pattern 可以配置成'hour:00:00'

    自定义抽取分区时间的话,需要实现PartitionTimeExtractor接口:

    public interface PartitionTimeExtractor extends Serializable {
    
     String DEFAULT = "default";
     String CUSTOM = "custom";
    
     /**
      * Extract time from partition keys and values.
      */
     LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
        ...................
    }

    分区提交策略

    定义了分区提交的策略,也就是写完分区数据之后做什么事情,目前系统提供了以下行为:

    • metastore,只支持hive table,也就是写完数据之后,更新hive的元数据.
    • success file: 写完数据,往分区文件写一个success file.
    • 自定义

    key Default Type 描述 sink.partition-commit.policy.kind null string 可选:metastore,success-file,custom,这个可以写一个或者多个,比如可以这样,'metastore,success-file' sink.partition-commit.policy.class null string 如果上述选择custom的话,这里指定相应的class sink.partition-commit.success-file.name null string 如果上述选择的是success-file,这里可以指定写入的文件名,默认是 _SUCCESS

    完整示例

    定义实体类

     public static class UserInfo implements java.io.Serializable{
      private String userId;
      private Double amount;
      private Timestamp ts;
    
      public String getUserId(){
       return userId;
      }
    
      public void setUserId(String userId){
       this.userId = userId;
      }
    
      public Double getAmount(){
       return amount;
      }
    
      public void setAmount(Double amount){
       this.amount = amount;
      }
    
      public Timestamp getTs(){
       return ts;
      }
    
      public void setTs(Timestamp ts){
       this.ts = ts;
      }
     }

    自定义source

     public static class MySource implements SourceFunction<UserInfo>{
    
      String userids[] = {
        "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
        "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
        "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
        "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
        "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
      };
    
      @Override
      public void run(SourceContext<UserInfo> sourceContext) throws Exception{
       while (true){
        String userid = userids[(int) (Math.random() * (userids.length - 1))];
        UserInfo userInfo = new UserInfo();
        userInfo.setUserId(userid);
        userInfo.setAmount(Math.random() * 100);
        userInfo.setTs(new Timestamp(new Date().getTime()));
        sourceContext.collect(userInfo);
        Thread.sleep(100);
       }
      }
    
      @Override
      public void cancel(){
    
      }
     }

    写入file

    通过sql的ddl创建一个最简单的基于process time的table,然后写入数据.

    在这个实例中,我们开启了checkpoint的时间间隔是10s,所以会每隔10s写入一个orc文件.

      StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.enableCheckpointing(10000);
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
      DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());
      String sql = "CREATE TABLE fs_table (\n" +
                   "  user_id STRING,\n" +
                   "  order_amount DOUBLE,\n" +
                   "  dt STRING," +
                   "  h string," +
                   "  m string  \n" +
                   ") PARTITIONED BY (dt,h,m) WITH (\n" +
                   "  'connector'='filesystem',\n" +
                   "  'path'='file:///tmp/abc',\n" +
                   "  'format'='orc'\n" +
                   ")";
      tEnv.executeSql(sql);
      tEnv.createTemporaryView("users", dataStream);
      String insertSql = "insert into  fs_table SELECT userId, amount, " +
                         " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
    
      tEnv.executeSql(insertSql);

    完整的代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteFile.java

    更多精彩内容,欢迎关注我的公众号【大数据技术与应用实战】

    相关推荐

    Python自动化脚本应用与示例(python办公自动化脚本)

    Python是编写自动化脚本的绝佳选择,因其语法简洁、库丰富且跨平台兼容性强。以下是Python自动化脚本的常见应用场景及示例,帮助你快速上手:一、常见自动化场景文件与目录操作...

    Python文件操作常用库高级应用教程

    本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。一、高级文件系统监控1.1watchdog库-实时文件系统监控安装与基本使用:...

    Python办公自动化系列篇之六:文件系统与操作系统任务

    作为高效办公自动化领域的主流编程语言,Python凭借其优雅的语法结构、完善的技术生态及成熟的第三方工具库集合,已成为企业数字化转型过程中提升运营效率的理想选择。该语言在结构化数据处理、自动化文档生成...

    14《Python 办公自动化教程》os 模块操作文件与文件夹

    在日常工作中,我们经常会和文件、文件夹打交道,比如将服务器上指定目录下文件进行归档,或将爬虫爬取的数据根据时间创建对应的文件夹/文件,如果这些还依靠手动来进行操作,无疑是费时费力的,这时候Pyt...

    python中os模块详解(python os.path模块)

    os模块是Python标准库中的一个模块,它提供了与操作系统交互的方法。使用os模块可以方便地执行许多常见的系统任务,如文件和目录操作、进程管理、环境变量管理等。下面是os模块中一些常用的函数和方法:...

    21-Python-文件操作(python文件的操作步骤)

    在Python中,文件操作是非常重要的一部分,它允许我们读取、写入和修改文件。下面将详细讲解Python文件操作的各个方面,并给出相应的示例。1-打开文件...

    轻松玩转Python文件操作:移动、删除

    哈喽,大家好,我是木头左!Python文件操作基础在处理计算机文件时,经常需要执行如移动和删除等基本操作。Python提供了一些内置的库来帮助完成这些任务,其中最常用的就是os模块和shutil模块。...

    Python 初学者练习:删除文件和文件夹

    在本教程中,你将学习如何在Python中删除文件和文件夹。使用os.remove()函数删除文件...

    引人遐想,用 Python 获取你想要的“某个人”摄像头照片

    仅用来学习,希望给你们有提供到学习上的作用。1.安装库需要安装python3.5以上版本,在官网下载即可。然后安装库opencv-python,安装方式为打开终端输入命令行。...

    Python如何使用临时文件和目录(python目录下文件)

    在某些项目中,有时候会有大量的临时数据,比如各种日志,这时候我们要做数据分析,并把最后的结果储存起来,这些大量的临时数据如果常驻内存,将消耗大量内存资源,我们可以使用临时文件,存储这些临时数据。使用标...

    Linux 下海量文件删除方法效率对比,最慢的竟然是 rm

    Linux下海量文件删除方法效率对比,本次参赛选手一共6位,分别是:rm、find、findwithdelete、rsync、Python、Perl.首先建立50万个文件$testfor...

    Python 开发工程师必会的 5 个系统命令操作库

    当我们需要编写自动化脚本、部署工具、监控程序时,熟练操作系统命令几乎是必备技能。今天就来聊聊我在实际项目中高频使用的5个系统命令操作库,这些可都是能让你效率翻倍的"瑞士军刀"。一...

    Python常用文件操作库使用详解(python文件操作选项)

    Python生态系统提供了丰富的文件操作库,可以处理各种复杂的文件操作需求。本教程将介绍Python中最常用的文件操作库及其实际应用。一、标准库核心模块1.1os模块-操作系统接口主要功能...

    11. 文件与IO操作(文件io和网络io)

    本章深入探讨Go语言文件处理与IO操作的核心技术,结合高性能实践与安全规范,提供企业级解决方案。11.1文件读写11.1.1基础操作...

    Python os模块的20个应用实例(python中 import os模块用法)

    在Python中,...