spark自定义sql系统(sparksql运行流程)

技术如何将Spark SQL模型变为在线服务如何将Spark SQL模型变为在线服务,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Spark

对于如何将Spark SQL模型转化为在线服务,很多新手都不是很清楚。为了帮助大家解决这个问题,下面小编就详细讲解一下。需要的人可以从中学习,希望你能有所收获。

00-1010的第四范式已经在金融行业的反欺诈、媒体行业的新闻推荐、能源行业的管道检测等众多行业落地了数万个AI应用。SparkSQL在这些AI应用中起到了快速实现特征转换的重要作用。

如何将Spark  SQL模型变为在线服务

SparkSQL在特征转换中有几种主要类型。

多表场景,用于拼接表,如交易信息表拼接账表。

使用udf进行简单的特征转换,如时间戳的小时函数处理。

时间窗口和udaf用于时间序列特征处理,例如计算一个人最后一天的消费金额之和。

SparkSQL到目前为止很好地解决了离线模型训练的特征转换问题,但是随着AI应用的发展,人们对模型的期待不再仅仅是得到离线的研究成果,而是在真实的业务场景中发挥价值,这就是模型应用场景,需要高性能和实时推理。这时,我们会遇到以下问题。

如何将多表数据从离线映射到在线,即批量训练时输入大量的表,在线环境下这些表应该以什么形式存在,也会影响整个系统架构。做得好可以提高效率,做得不好会大大增加模型产生商业价值的成本。

将SQL转换为实时执行的成本很高,因为在线推理需要很高的性能,而数据科学家可能会制作成千上万个特征,每个特征都是由人肉转化而来的,这将大大增加工程成本。

离线功能很难与在线功能保持一致,手动转换会导致性能一致,往往很难保持一致。

线下效果很大,但线上效果无法满足业务需求。

在具体的反欺诈场景中,模型应用需要tp99 20ms毫秒来检测交易是否欺诈,因此模型应用的性能要求非常高。

SparkSQL在机器学习场景中应用

如何将Spark  SQL模型变为在线服务

特征工程数据库补充了SparkSQL的功能。

以数据库的形式,解决了离线表到在线表的映射问题。我们前面给出的答案是离线表是如何分布的,以及在线表是如何分布的。

通过同一套代码进行线下和线上的特征转换,保证了线上模型效果。

科学家和业务开发团队的合作以sql作为传输媒介,而不是手动转换代码,大大提高了模型迭代的效率。

与scala实现的spark2.x和3.x相比,llvm加速的sql在时序复杂特征场景下可以加速2 ~ 3倍,在线内存存储可以保证sql以极低的延迟返回结果。

第四范式特征工程数据库是如何解决这些问题

演示模型训练场景为了预测出租车行程结束所需的时间,这里我们将使用fedb、pyspark和lightgbm等工具最终构建一个http模型推理服务,这也将是spark在机器学习场景中的实践。

如何将Spark  SQL模型变为在线服务

整个demo200有200多行代码,生产时间不到半个小时。

Train_sql.py特征计算和训练,80行代码

Predict_server.py模型推理http服务,129行代码

快速将spark sql 模型变成实时服务demo

整个训练数据如下样子

抽样资料

id,vendor_id,皮卡_datetime,dropoff_datetime,乘客_计数,皮卡_经度,皮卡_纬度,drop off _经度,drop off _纬度,store_and_fwd_flag,trip_durationid3097625,1,2016-01-2216:013:00,2016-01-2216:15:16,2,-73 . 40600000001

9,40.7613525390625,-73.95573425292969,40.772396087646484,N,856  id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198  id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303  id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330  id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496  id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935  id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904  id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331  id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674 `

场景特征变换sql脚本

特征变换

select trip_duration, passenger_count,  sum `(pickup_latitude) over w as vendor_sum_pl,`  max `(pickup_latitude) over w as vendor_max_pl,`  min `(pickup_latitude) over w as vendor_min_pl,`  avg `(pickup_latitude) over w as vendor_avg_pl,`  sum `(pickup_latitude) over w2 as pc_sum_pl,`  max `(pickup_latitude) over w2 as pc_max_pl,`  min `(pickup_latitude) over w2 as pc_min_pl,`  avg `(pickup_latitude) over w2 as pc_avg_pl ,`  count `(vendor_id) over w2 as pc_cnt,`  count `(vendor_id) over w as vendor_cnt`  from {}  window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),  w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) `

我们选择了vendor_id 和 passenger_count 两个纬度做时序特征

train_df = spark.sql(train_sql)  # specify your configurations as a dict  params = {  'boosting_type' `: 'gbdt' ,  'objective' `: 'regression' ,  'metric' `: { 'l2' , 'l1' },  'num_leaves' `: 31 ,  'learning_rate' `: 0.05 ,  'feature_fraction' `: 0.9 ,  'bagging_fraction' `: 0.8 ,  'bagging_freq' `: 5 ,  'verbose' `: 0`  }  print `( 'Starting training...' )`  gbm = lgb.train(params,  lgb_train,  num_boost_round `= 20 ,`  valid_sets `= lgb_eval,  early_stopping_rounds `= 5 )`  gbm.save_model( `'model.txt' )执行模型训练过程,最终产生model.txt

模型推理过程

导入数据代码

import  def insert_row(line):  row = line.split( `',' )  row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`  row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`  insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row)  driver.executeInsert( `'db_test' , insert)  with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd:  idx = 0  for line in fd:  if idx = `= 0 :  idx = idx + 1  continue  insert_row(line.replace( `'n' , ''))  idx = idx + 1 `  注:train.csv为训练数据csv格式版本

模型推理逻辑

predict.py  def` `post( self ):  row = json.loads( `self .request.body)  ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql)  if not ok or not req:  self `.write( "fail to get req" )`  return  input_schema = req.GetSchema()  if not input_schema:  self `.write( "no schema found" )`  return  str_length = 0  for i in range `(input_schema.GetColumnCnt()):`  if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' :  str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))`  req.Init(str_length)  for i in range `(input_schema.GetColumnCnt()):`  tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))  if tname = `= 'string' :  req.AppendString(row.get(input_schema.GetColumnName(i), ''))  elif tname = `= 'int32' :  req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))`  elif tname = `= 'double' :  req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))`  elif tname = `= 'timestamp' :  req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))`  else `:`  req.AppendNULL()  if not req.Build():  self `.write( "fail to build request" )`  return  ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req)  if not ok:  self `.write( "fail to execute sql" )`  return  rs. `Next ()  ins = build_feature(rs)  self `.write( "----------------ins---------------\n" )`  self `.write( str (ins) + "n" )  duration = bst.predict(ins)  self `.write( "---------------predict trip_duration -------------\n" )`  self `.write( "%s s" % str (duration[ 0 ]))``

最终执行效果

python3 predict.py  ----------------ins---------------  [[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097  40.774097 40.774097 1. 1. ]]  ---------------predict trip_duration -------------  859.3298781277192 s `

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/149954.html

(0)

相关推荐

  • 抹胸内衣,穿抹胸裙如何隐藏内衣带

    技术抹胸内衣,穿抹胸裙如何隐藏内衣带1抹胸内衣、露双肩抹胸上衣/裙装系列露双肩抹胸的衣服看起来恐怕只能佩戴无肩带文胸,可真真怕一不留神来个滑落走光。姑娘们,方法是有滴:取一条闲置的内衣肩带,在内衣下缘多绕一圈,妥妥解决!

    生活 2021年10月20日
  • js中indexof是什么(js中indexof怎么用)

    技术JS中includes()和indexOf()有哪些区别这篇文章给大家分享的是有关JS中includes()和indexOf()有哪些区别的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、

    攻略 2021年12月20日
  • JavaScript怎么实现自定义日历效果

    技术JavaScript怎么实现自定义日历效果本篇内容主要讲解“JavaScript怎么实现自定义日历效果”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“JavaScript怎

    攻略 2021年11月8日
  • 怎么理解JavaScript中的语法和代码结构

    技术怎么理解JavaScript中的语法和代码结构本篇内容主要讲解“怎么理解JavaScript中的语法和代码结构”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么理解Jav

    攻略 2021年11月20日
  • Hibernate一级缓存是什么

    技术Hibernate一级缓存是什么本篇内容介绍了“Hibernate一级缓存是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够

    攻略 2021年12月4日
  • 退烧按摩手法图解法,小儿发热推拿方法有哪些

    技术退烧按摩手法图解法,小儿发热推拿方法有哪些小儿发烧推拿手法有什么小儿发热是指小儿体温超过正常范围,可见于多种急、慢性疾病过程中退烧按摩手法图解法。根据小儿发热病因可将其分为外感发热、肺胃实热和阴虚内热,临床可采用按摩

    生活 2021年10月22日