简述storm的拓扑结构(storm拓扑原理)

技术storm怎么构建拓扑代码这篇文章主要讲解了“storm怎么构建拓扑代码”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“storm怎么构建拓扑代码”吧!1. 构建拓扑

这篇文章主要讲解了"风暴怎么构建拓扑代码",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"风暴怎么构建拓扑代码"吧!

storm怎么构建拓扑代码

storm怎么构建拓扑代码

storm怎么构建拓扑代码

1.构建拓扑代码

packagedemo

导入回类型。暴风雨。拓扑结构。拓扑生成器;

导入回类型。暴风雨。元组。字段;

publicclassAreaAmtTopo {

publicationstativitmain(String[]args){ 0

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout('spout '),new orders baseboout(kafkapproperties .Order_topic),5);

builder.setBolt('filter ',newAreaFilterBolt(),5).无序分组(' spout ');

builder.setBolt('areabolt ',newAreaAmtBolt(),2).fieldsGrouping('filter ',新字段(' area _ id ');

builder.setBolt('rsltbolt ',newAreaRsltBolt(),1).无序分组('区域螺栓');

}

}2.一级过滤螺栓

packagedemo

导入Java。乌提尔。地图;

导入回类型。暴风雨。任务。topologycontext

导入回类型。暴风雨。拓扑结构。basicoutputcollector

导入回类型。暴风雨。拓扑结构。ibasicbolt

导入回类型。暴风雨。拓扑结构。OutPutfields clarer

导入回类型。暴风雨。元组。字段;

导入回类型。暴风雨。元组。元组;

导入回类型。暴风雨。元组。价值观;

//一级的过滤螺栓

public class arefilterbolt implementsibasicbolt {

@覆盖

public void declareoutputfield(outputfield claredclarer){ 0

//TODOAuto-generatedmethodstub

庄家。声明(NewFields(' area _ id ',' order_amt ',' create _ time ');//元组里面每个价值的对应名字

}

@覆盖

nbsp; public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一个value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部汇总bolt(按日期和区域和汇总)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部汇总
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最终结果写入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代码

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感谢各位的阅读,以上就是“storm怎么构建拓扑代码”的内容了,经过本文的学习后,相信大家对storm怎么构建拓扑代码这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156275.html

(0)

相关推荐

  • 怎么部署skywalking容器

    技术怎么部署skywalking容器本篇内容主要讲解“怎么部署skywalking容器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么部署skywalking容器”吧!1.

    攻略 2021年11月16日
  • 平面法向量的求法,平面方程和法向量的关系及证明

    技术平面法向量的求法,平面方程和法向量的关系及证明所谓平面的法向量,就是与平面垂直的一个向量,它就是由平面方程中三个未知数的系数所组成的向量。 它们的关系可如此证明平面法向量的求法: 设向量(A,B,C)是一个过点(x0

    生活 2021年10月20日
  • 如何理解Java设计模式责任链模式

    技术如何理解Java设计模式责任链模式本篇文章给大家分享的是有关如何理解Java设计模式责任链模式,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、责任链模

    攻略 2021年10月23日
  • nodejs用哪些调试工具

    技术nodejs用哪些调试工具这篇文章将为大家详细讲解有关nodejs用哪些调试工具,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 调试node的工具:1、

    攻略 2021年11月7日
  • CF666C 题解

    技术CF666C 题解 CF666C 题解题目大意
    多组询问。每次询问给定一个串 \(s\) 和一个长度 \(L\)。
    问有多少个长度为 \(L\) 的只包含小写字母的串, 有至少一个子序列等于 \(s

    礼包 2021年11月7日
  • ipv6组播地址范围(ipv6的组播地址)

    技术SDN网络IPv6组播机制是什么本篇内容主要讲解“SDN网络IPv6组播机制是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SDN网络IPv6组播机制是什么”吧!知识

    攻略 2021年12月20日