简述storm的拓扑结构(storm拓扑原理)

技术storm怎么构建拓扑代码这篇文章主要讲解了“storm怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“storm怎么构建拓扑代码”吧!1. 构建拓扑

这篇文章主要讲解了"风暴怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"风暴怎么构建拓扑代码"吧!

storm怎么构建拓扑代码

storm怎么构建拓扑代码

storm怎么构建拓扑代码

1.构建拓扑代码

packagedemo

导入回类型。暴风雨。拓扑结构。拓扑生成器;

导入回类型。暴风雨。元组。字段;

publicclassAreaAmtTopo {

publicationstativitmain(String[]args){ 0

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout('spout '),new orders baseboout(kafkapproperties .Order_topic),5);

builder.setBolt('filter ',newAreaFilterBolt(),5).无序分组(' spout ');

builder.setBolt('areabolt ',newAreaAmtBolt(),2).fieldsGrouping('filter ',新字段(' area _ id ');

builder.setBolt('rsltbolt ',newAreaRsltBolt(),1).无序分组('区域螺栓');

}

}2.一级过滤螺栓

packagedemo

导入Java。乌提尔。地图;

导入回类型。暴风雨。任务。topologycontext

导入回类型。暴风雨。拓扑结构。basicoutputcollector

导入回类型。暴风雨。拓扑结构。ibasicbolt

导入回类型。暴风雨。拓扑结构。OutPutfields clarer

导入回类型。暴风雨。元组。字段;

导入回类型。暴风雨。元组。元组;

导入回类型。暴风雨。元组。价值观;

//一级的过滤螺栓

public class arefilterbolt implementsibasicbolt {

@覆盖

public void declareoutputfield(outputfield claredclarer){ 0

//TODOAuto-generatedmethodstub

庄家。声明(NewFields(' area _ id ',' order_amt ',' create _ time ');//元组里面每个价值的对应名字

}

@覆盖

nbsp; public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一个value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部汇总bolt(按日期和区域和汇总)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部汇总
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最终结果写入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代码

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html

(0)

相关推荐

  • 骑马的英语,关于骑马的英语作文高中

    技术骑马的英语,关于骑马的英语作文高中Since ancient times, people riding with a saying called Centaur with success.This word expl

    生活 2021年10月21日
  • 岁寒然后知松柏之后凋也的意思,岁寒然后知松柏之后凋也的作文

    技术岁寒然后知松柏之后凋也的意思,岁寒然后知松柏之后凋也的作文《论语》一书中,有这样一句经典语句:子曰,岁寒,然后知松柏之后凋也。这句话出自《论语·子罕》,意思是:只有天气寒冷了,我们才明白松柏是最后凋谢的岁寒然后知松柏

    生活 2021年10月28日
  • php7.2运行失败怎么解决

    技术php7.2运行失败怎么解决本篇内容主要讲解“php7.2运行失败怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“php7.2运行失败怎么解决”吧!

    攻略 2021年12月9日
  • 如何解决普通方法调用静态属性的Sonar问题

    技术如何解决普通方法调用静态属性的Sonar问题如何解决普通方法调用静态属性Sonar问题,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。问题

    攻略 2021年10月20日
  • 抖音刷赞网,抖音刷赞网站推广永久?

    技术抖音刷赞网,抖音刷赞网站推广永久?抖音点赞、抖音粉丝、抖音评论、抖音播放是怎么刷合适?如今越来越多的人都会玩抖音,因为抖音里面有特别多有趣的内容,很多的用户会去把自己生活当中比较有趣的一些生活经验或者是生活经历发在抖

    测评 2021年11月10日
  • 如何运用爬虫框架Scrapy部署爬虫

    技术如何运用爬虫框架Scrapy部署爬虫这篇文章将为大家详细讲解有关如何运用爬虫框架Scrapy部署爬虫,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。这里主要讲述如何将我

    攻略 2021年11月19日