flink 获取kafka数据(使用flink将数据写入到kafka)

技术Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参

本文将详细解释如何在Flink中获取TableAPI、SQL和Kafka消息。这篇文章的内容质量很高,所以边肖会分享给大家参考。希望你看完这篇文章后有所了解。

使用TbaleSQL和Flink kafka连接器从kafka的消息队列中获取数据。

示例环境

Java . version :1 . 8 . xfrink . version :1 . 11 . 1 Kafka:2.11示例数据源(项目代码云下载)

Flink系统的建设开发环境和数据

示例(pom.xml)

Flink系统的TableAPI SQL和样例模块

SelectToKafka.java

package com . flink . examples . Kafka;

import org . Apache . flink . streaming . API . TiME Template;

import org . Apache . flink . streaming . API . datastream . datastream;

import org . Apache . flink . streaming . API . environment . streaming executionenvironment;

import org . Apache . flink . table . API . EnvironmentSettings;

import org . Apache . flink . table . API . table;

import org . Apache . flink . table . API . bridge . Java . streamtableenvironment;

import org . Apache . flink . types . row;

/**

*@Description使用TbaleSQL和Flinkkafka连接器从Kafka的消息队列中获取数据。

*/

publicclassSelectToKafka{

/**

官方参考:https://ci . Apache . org/project/flink/flink-docs-release-1.12/zh/dev/table/connectors/Kafka . html。

起始偏移位置

configscan.startup.mode选项指定Kafka用户的启动模式。的有效枚举为:

集团抵销:从特定消费群体在ZK/卡夫卡经纪公司的承诺抵销开始。

早偏移:从最早偏移开始。

最新偏移:从最新偏移开始。

时间戳:从每个分区的用户提供的时间戳开始。

特定偏移量:从每个分区的用户提供的特定偏移量开始。

默认选项值组-抵销表示上次从ZK/卡夫卡经纪人提交的抵销消费。

保证一致性

Sink.semantic选项可选择三种不同的操作模式:

无:弗林克不能保证任何事情。生成的记录可能会丢失或重复。

At _ lease _ once(默认设置):这确保不会丢失任何记录(尽管它们可以被复制)。

恰好_一次:Kafka事务将用于提供准确的语义一次。无论何时使用事务写入Kafka,请不要忘记为任何使用Kafka记录的应用程序设置所需的设置隔离级别(read_committed或read_uncommit)

ted-后者是默认值)。
     */
    static String table_sql = "CREATE TABLE KafkaTable (\n" +
            "  `user_id` BIGINT,\n" +
            "  `item_id` BIGINT,\n" +
            "  `behavior` STRING,\n" +
            "  `ts` TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'user_behavior',\n" +
            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
            "  'properties.group.id' = 'testGroup',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "  'format' = 'json'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默认流时间方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        //注册kafka数据维表
        tEnv.executeSql(table_sql);

        String sql = "select user_id,item_id,behavior,ts from KafkaTable";
        Table table = tEnv.sqlQuery(sql);
        //打印字段结构
        table.printSchema();

        //table 转成 dataStream 流
        DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
        behaviorStream.print();

        env.execute();
    }
}

打印结果

root
 |-- user_id: BIGINT
 |-- item_id: BIGINT
 |-- behavior: STRING
 |-- ts: TIMESTAMP(3)

3> 1,1,normal,2021-01-26T10:25:44

关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/146800.html

(0)

相关推荐

  • 博士今义,“博士”一词的古今异义怎么讲

    技术博士今义,“博士”一词的古今异义怎么讲博士博士今义,古义为官名。秦汉时是掌管书籍文典、通晓史事的官职。 今义为学术上专通一经或精通一艺、从事教授生徒的官职。如明代初期,朱允炆曾封方孝孺为“文学博士”等。 “博士”最早

    生活 2021年10月25日
  • 雄伟的反义词,气势宏大雄伟的意思写词语

    技术雄伟的反义词,气势宏大雄伟的意思写词语气势磅礴雄伟的反义词、气吞山河、一泻千里、地动山摇、波澜壮阔、排山倒海、气贯长虹、浩浩荡荡、声势浩大、汹涌澎湃、千军万马、气吞虹霓。1、一泻千里【解释】:泻:水往下直注。形容江河

    生活 2021年10月26日
  • 拍身份证照片要求,拍摄身份证照片有哪些要求

    技术拍身份证照片要求,拍摄身份证照片有哪些要求不能穿着制式服装拍身份证照片要求,尽量穿深色、有领上衣。公民申请办理居民身份证时,相片为彩色正面免冠头像。申领人请勿穿着制式服装,尽量穿深色、有领上衣,但并不局限于黑色、深蓝

    生活 2021年10月24日
  • 如何解析Java 数据结构中时间复杂度与空间复杂度

    技术如何解析Java 数据结构中时间复杂度与空间复杂度这篇文章给大家介绍如何解析Java 数据结构中时间复杂度与空间复杂度,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。算法效率在使用当中,算法效率分

    攻略 2021年11月5日
  • 怎么进行Nginx服务器的性能分析

    技术怎么进行Nginx服务器的性能分析今天就跟大家聊聊有关怎么进行Nginx服务器的性能分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Nginx服务器一直在默

    攻略 2021年11月25日
  • 如何使用plink进行连锁不平衡分析

    技术如何使用plink进行连锁不平衡分析本篇文章为大家展示了如何使用plink进行连锁不平衡分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。plink是进行连锁不平衡分析的常用

    攻略 2021年11月10日