flinksql读取kafka写入mysql(flink写数据到数据库)

技术flinksql怎么将数据写入到文件中本篇内容主要讲解“flinksql怎么将数据写入到文件中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flinksql怎么将数据写入

本篇内容主要讲解"弗林克索尔怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"弗林克索尔怎么将数据写入到文件中"吧!

打包。JD。数据输出;

导入com。JD。数据。传感器读数;

导入组织。阿帕奇。弗林克。API。常见。功能。MapFuncTion

导入组织。阿帕奇。弗林克。流媒体。API。数据流。数据流源;

导入组织。阿帕奇。弗林克。流媒体。API。数据流。SingleOutputStreamOperator

导入组织。阿帕奇。弗林克。流媒体。API。环境。流式执行环境;

导入组织。阿帕奇。弗林克。桌子。API。数据类型;

导入组织。阿帕奇。弗林克。桌子。API。桌子;

导入组织。阿帕奇。弗林克。桌子。API。桥。Java。streamtableenvironment

导入组织。阿帕奇。弗林克。桌子。描述符。文件系统;

导入组织。阿帕奇。弗林克。桌子。描述符。老CSV

导入组织。阿帕奇。弗林克。桌子。描述符。图式;

publicclassFlinkSqlOutputFile{

公共静态void main(String[]args)throwsException {

streamexecutionenvironmentev=streamexecutionenvironment。getexecutionenvironment();

环境。设置并行度(1);

DataStreamSourceStringstream=env。readtextfile('/Users/刘海静/Desktop/flinktestword/AAA。txt’);

//DataStreamSourceStream=env。SocketTextStream(' localhost ',8888);

singletoutstream operator sensorreadingmap=stream。地图(新地图函数字符串,SensorReading(){ 0

publicSensorReadingmap(字符串)throwsException{

String[]split=s.split(',');

returnnewSensorReading(拆分[0],拆分[1],拆分[2]);

}

(=国家统计局标准)国家标准局

p;       });

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        使用 table api
        Table table = tableEnv.fromDataStream(map);
//        table.printSchema();
        Table select = table.select("a,b");
//        select.printSchema();

//        使用 sql api
//        tableEnv.createTemporaryView("test", map);
//        Table select = tableEnv.sqlQuery(" select a, b from test");
//        select.printSchema();

//        DataStream<SensorReading2> sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
//        sensorReading2DataStream.map(new MapFunction<SensorReading2, Object>() {
//            @Override
//            public Object map(SensorReading2 value) throws Exception {
//                System.out.println(value.a+"   "+ value.b);
//                return null;
//            }
//        });

//        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
//                .withFormat(new Csv())
//                .withSchema(
//                        new Schema()
//                                .field("a", DataTypes.STRING())
//                                .field("b", DataTypes.STRING()))
//                .inAppendMode()
//                .createTemporaryTable("outputTable");
//        select.insertInto("outputTable");

        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
                .withFormat(new OldCsv())
                .withSchema(new Schema()
                                .field("a", DataTypes.STRING())
                ).inAppendMode()
                .createTemporaryTable("outputTable");
        select.insertInto("outputTable");

        env.execute();

    }
}

到此,相信大家对“flinksql怎么将数据写入到文件中”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156282.html

(0)

相关推荐

  • 中国标志性建筑,中国最具标志性的建筑有哪些

    技术中国标志性建筑,中国最具标志性的建筑有哪些每个国家都拥有一些标志性建筑,一看到它就能唤起对于它的记忆,标志性建筑也是一个国家的名片和象征。要说中国有哪些标志性建筑中国标志性建筑?不同的思考维度有不同的答案,如果从建筑

    生活 2021年10月27日
  • flask中orm增删改查操作

    技术flask中orm增删改查操作 flask中orm增删改查操作flask中orm增删改查操作
    一、创建表
    # model.py
    import datetime
    from sqlalchemy imp

    礼包 2021年11月7日
  • Python如何移植到IMX6ULL开发板上

    技术Python如何移植到IMX6ULL开发板上这篇文章主要讲解了“ Python如何移植到IMX6ULL开发板上”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“ Pyth

    攻略 2021年11月23日
  • 最全分布式锁解决方案详解

    技术最全分布式锁解决方案详解 最全分布式锁解决方案详解一. 概述
    1.1 锁的概念在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时

    礼包 2021年11月7日
  • 常用的gis计算方法有哪些

    技术常用的gis计算方法有哪些这篇文章主要为大家展示了“常用的gis计算方法有哪些”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“常用的gis计算方法有哪些”这篇文章吧。在系统

    攻略 2021年12月1日
  • 在线编辑器好吗

    技术在线编辑器好吗本篇内容介绍了“在线编辑器好吗”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!  在线编辑器是一种通过浏

    攻略 2021年11月5日