简述storm的拓扑结构(storm拓扑原理)

技术storm怎么构建拓扑代码这篇文章主要讲解了“storm怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“storm怎么构建拓扑代码”吧!1. 构建拓扑

这篇文章主要讲解了”风暴怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习”风暴怎么构建拓扑代码”吧!

storm怎么构建拓扑代码

storm怎么构建拓扑代码

storm怎么构建拓扑代码

1.构建拓扑代码

packagedemo

导入回类型。暴风雨。拓扑结构。拓扑生成器;

导入回类型。暴风雨。元组。字段;

publicclassAreaAmtTopo {

publicationstativitmain(String[]args){ 0

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout(‘spout ‘),new orders baseboout(kafkapproperties .Order_topic),5);

builder.setBolt(‘filter ‘,newAreaFilterBolt(),5).无序分组(‘ spout ‘);

builder.setBolt(‘areabolt ‘,newAreaAmtBolt(),2).fieldsGrouping(‘filter ‘,新字段(‘ area _ id ‘);

builder.setBolt(‘rsltbolt ‘,newAreaRsltBolt(),1).无序分组(‘区域螺栓’);

}

}2.一级过滤螺栓

packagedemo

导入Java。乌提尔。地图;

导入回类型。暴风雨。任务。topologycontext

导入回类型。暴风雨。拓扑结构。basicoutputcollector

导入回类型。暴风雨。拓扑结构。ibasicbolt

导入回类型。暴风雨。拓扑结构。OutPutfields clarer

导入回类型。暴风雨。元组。字段;

导入回类型。暴风雨。元组。元组;

导入回类型。暴风雨。元组。价值观;

//一级的过滤螺栓

public class arefilterbolt implementsibasicbolt {

@覆盖

public void declareoutputfield(outputfield claredclarer){ 0

//TODOAuto-generatedmethodstub

庄家。声明(NewFields(‘ area _ id ‘,’ order_amt ‘,’ create _ time ‘);//元组里面每个价值的对应名字

}

@覆盖

nbsp; public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一个value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部汇总bolt(按日期和区域和汇总)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部汇总
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最终结果写入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代码

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html

(0)

相关推荐

  • html中em是什么单位(em值是什么意思)

    技术css3中em指的是什么单位小编给大家分享一下css3中em指的是什么单位,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧! 在css3中,em是一个相对长度单位,相对于当前

    攻略 2021年12月19日
  • TP6管理后台实战第五天-文章管理

    技术TP6管理后台实战第五天-文章管理 TP6管理后台实战第五天-文章管理第五天目标:
    1、文章管理进入开发:
    1、文章分类管理1.1 文章分类列表 — ok搜索项: 分类名称 分类状态列表项:ID,

    礼包 2021年10月27日
  • css如何实现鼠标点击表格变色效果

    技术css如何实现鼠标点击表格变色效果这篇文章主要介绍了css如何实现鼠标点击表格变色效果,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

    攻略 2021年11月26日
  • golang自定义json化(golang 调用接口并更新本地数据)

    技术如何用golang源码分析json.Marshal如何用golang源码分析json.Marshal,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。G

    攻略 2021年12月13日
  • 张僧繇怎么读,张僧繇的点睛之笔指的是什么

    技术张僧繇怎么读,张僧繇的点睛之笔指的是什么张僧繇是梁朝著名的画师。有一次,皇帝命令他在金陵安乐寺的墙壁上画龙。不一会儿,两条栩栩如生的龙就出现在墙壁上了。这时皇帝发现这两条龙都没有眼睛,就问张僧繇这是为什么。张僧繇回答

    生活 2021年10月30日
  • MySQL varchar类型最大值是多少

    技术MySQL varchar类型最大值是多少本篇内容介绍了“MySQL varchar类型最大值是多少”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希

    攻略 2021年12月4日