简述storm的拓扑结构(storm拓扑原理)

技术storm怎么构建拓扑代码这篇文章主要讲解了“storm怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“storm怎么构建拓扑代码”吧!1. 构建拓扑

这篇文章主要讲解了"风暴怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"风暴怎么构建拓扑代码"吧!

storm怎么构建拓扑代码

storm怎么构建拓扑代码

storm怎么构建拓扑代码

1.构建拓扑代码

packagedemo

导入回类型。暴风雨。拓扑结构。拓扑生成器;

导入回类型。暴风雨。元组。字段;

publicclassAreaAmtTopo {

publicationstativitmain(String[]args){ 0

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout('spout '),new orders baseboout(kafkapproperties .Order_topic),5);

builder.setBolt('filter ',newAreaFilterBolt(),5).无序分组(' spout ');

builder.setBolt('areabolt ',newAreaAmtBolt(),2).fieldsGrouping('filter ',新字段(' area _ id ');

builder.setBolt('rsltbolt ',newAreaRsltBolt(),1).无序分组('区域螺栓');

}

}2.一级过滤螺栓

packagedemo

导入Java。乌提尔。地图;

导入回类型。暴风雨。任务。topologycontext

导入回类型。暴风雨。拓扑结构。basicoutputcollector

导入回类型。暴风雨。拓扑结构。ibasicbolt

导入回类型。暴风雨。拓扑结构。OutPutfields clarer

导入回类型。暴风雨。元组。字段;

导入回类型。暴风雨。元组。元组;

导入回类型。暴风雨。元组。价值观;

//一级的过滤螺栓

public class arefilterbolt implementsibasicbolt {

@覆盖

public void declareoutputfield(outputfield claredclarer){ 0

//TODOAuto-generatedmethodstub

庄家。声明(NewFields(' area _ id ',' order_amt ',' create _ time ');//元组里面每个价值的对应名字

}

@覆盖

nbsp; public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一个value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部汇总bolt(按日期和区域和汇总)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部汇总
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最终结果写入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代码

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html

(0)

相关推荐

  • 狼原文及翻译,文言古诗《狼》的汉语翻译

    技术狼原文及翻译,文言古诗《狼》的汉语翻译有个屠户卖肉回家,天已到傍晚狼原文及翻译。忽然来了一只狼,望着(屠户)担子里的肉,馋得好像要流口水似的,跟着屠户走了好几里。屠户害怕了,把刀拿出来给它看,狼稍稍后退一点,等到屠户

    生活 2021年10月27日
  • CCFCSP 202012-2 期末预测之最佳阈值

    技术CCFCSP 202012-2 期末预测之最佳阈值 CCFCSP 202012-2 期末预测之最佳阈值题目背景
    考虑到安全指数是一个较大范围内的整数、小菜很可能搞不清楚自己是否真的安全,顿顿决定设置

    礼包 2021年11月15日
  • MySQL死锁举例分析

    技术MySQL死锁举例分析这篇文章主要介绍“MySQL死锁举例分析”,在日常操作中,相信很多人在MySQL死锁举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MySQL死锁举例分析”

    攻略 2021年11月18日
  • javascript如何将对象转化为数组

    技术javascript如何将对象转化为数组这篇文章将为大家详细讲解有关javascript如何将对象转化为数组,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

    攻略 2021年11月19日
  • 尼德兰,荷兰为什么改国名为尼德兰

    技术尼德兰,荷兰为什么改国名为尼德兰综合国外多家媒体报道,从2020年1月起,荷兰将把国名正式改称为尼德兰,改国名需花费32万美元尼德兰。好好的国名叫了这么多年,现在又花钱又麻烦,为什么非要改呢?(荷兰地图)原来,荷兰的

    生活 2021年10月22日
  • 11.23 EF

    技术11.23 EF 11.23 EF一、EF框架
    a、EF框架:全称EntityFramework ,它微软提供的一种ORM工具,支持面向数据的软件应用程序的技术。ORM让开发人员节省数据库访问的代码

    礼包 2021年11月24日