flinksql读取kafka写入mysql(flink写数据到数据库)

技术flinksql怎么将数据写入到文件中本篇内容主要讲解“flinksql怎么将数据写入到文件中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flinksql怎么将数据写入

本篇内容主要讲解"弗林克索尔怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"弗林克索尔怎么将数据写入到文件中"吧!

打包。JD。数据输出;

导入com。JD。数据。传感器读数;

导入组织。阿帕奇。弗林克。API。常见。功能。MapFuncTion

导入组织。阿帕奇。弗林克。流媒体。API。数据流。数据流源;

导入组织。阿帕奇。弗林克。流媒体。API。数据流。SingleOutputStreamOperator

导入组织。阿帕奇。弗林克。流媒体。API。环境。流式执行环境;

导入组织。阿帕奇。弗林克。桌子。API。数据类型;

导入组织。阿帕奇。弗林克。桌子。API。桌子;

导入组织。阿帕奇。弗林克。桌子。API。桥。Java。streamtableenvironment

导入组织。阿帕奇。弗林克。桌子。描述符。文件系统;

导入组织。阿帕奇。弗林克。桌子。描述符。老CSV

导入组织。阿帕奇。弗林克。桌子。描述符。图式;

publicclassFlinkSqlOutputFile{

公共静态void main(String[]args)throwsException {

streamexecutionenvironmentev=streamexecutionenvironment。getexecutionenvironment();

环境。设置并行度(1);

DataStreamSourceStringstream=env。readtextfile('/Users/刘海静/Desktop/flinktestword/AAA。txt’);

//DataStreamSourceStream=env。SocketTextStream(' localhost ',8888);

singletoutstream operator sensorreadingmap=stream。地图(新地图函数字符串,SensorReading(){ 0

publicSensorReadingmap(字符串)throwsException{

String[]split=s.split(',');

returnnewSensorReading(拆分[0],拆分[1],拆分[2]);

}

(=国家统计局标准)国家标准局

p;       });

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        使用 table api
        Table table = tableEnv.fromDataStream(map);
//        table.printSchema();
        Table select = table.select("a,b");
//        select.printSchema();

//        使用 sql api
//        tableEnv.createTemporaryView("test", map);
//        Table select = tableEnv.sqlQuery(" select a, b from test");
//        select.printSchema();

//        DataStream<SensorReading2> sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
//        sensorReading2DataStream.map(new MapFunction<SensorReading2, Object>() {
//            @Override
//            public Object map(SensorReading2 value) throws Exception {
//                System.out.println(value.a+"   "+ value.b);
//                return null;
//            }
//        });

//        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
//                .withFormat(new Csv())
//                .withSchema(
//                        new Schema()
//                                .field("a", DataTypes.STRING())
//                                .field("b", DataTypes.STRING()))
//                .inAppendMode()
//                .createTemporaryTable("outputTable");
//        select.insertInto("outputTable");

        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
                .withFormat(new OldCsv())
                .withSchema(new Schema()
                                .field("a", DataTypes.STRING())
                ).inAppendMode()
                .createTemporaryTable("outputTable");
        select.insertInto("outputTable");

        env.execute();

    }
}

到此,相信大家对“flinksql怎么将数据写入到文件中”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156282.html

(0)

相关推荐

  • 免费的Web压力测试工具有哪些

    技术免费的Web压力测试工具有哪些这篇文章主要介绍“免费的Web压力测试工具有哪些”,在日常操作中,相信很多人在免费的Web压力测试工具有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”

    攻略 2021年11月18日
  • python预计运行时间怎么做(python对时间和日期的处理)

    技术python如何对日期时间进行处理这篇文章给大家分享的是有关python如何对日期时间进行处理的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。开发中常用的日期操作有哪些?获取当前时间获取系统

    攻略 2021年12月19日
  • 下拉表格组件

    技术下拉表格组件 下拉表格组件封装下拉表格组件
    !-- my-selectv-model="changeForm.productname" //双向绑定的数据 (必传):arrData="cpNameO

    礼包 2021年11月1日
  • 27 视图集

    技术27 视图集 27 视图集视图集
    """视图集特点: 1,可以将一组相关的操作, 放在一个类中进行完成 2,不提供get,post方法, 使用retrieve, create方法来替代

    礼包 2021年11月4日
  • 如何将eclipse项目导入myeclipse(可以同时安装eclipse和myeclipse)

    技术如何进行MyEclipse6.5+Eclipse3.4的中文问题浅析今天就跟大家聊聊有关如何进行MyEclipse6.5+Eclipse3.4的中文问题浅析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了

    攻略 2021年12月18日
  • C++代码怎么优化

    技术C++代码怎么优化这篇文章主要讲解了“C++代码怎么优化”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“C++代码怎么优化”吧!使用模板的特化或者偏特化技术可以指定在使

    攻略 2021年11月30日