哈尔滨理工大学

软件与微电子学院

实 验 报 告

(2020-2021第二学期)

课程名称:实时数据处理
班 级:软件18- 1 班
学 号:1814010130
姓 名:张立辉

哈尔滨理工大学软件与微电子学院


实验名称:实验2 storm实时处理结果的NoSQL保存专 业软件工程
姓 名张立辉学 号1814010130班 级软件18-1

一、实验目的:

理解大数据场景下数据处理和数据保存的结合,掌握Storm和Redis数据库的联合应用

二、实验内容:

在实验一的基础上,设计实时处理结果的键值模型,并定义一个Bolt完成实时处理结果的存储。
(1)任务一:保存词和词频数据
键值模型如下:
Word: frequency
定义VO类和接口:

public class WordFreq{
private String word;
private int freq;
private Date date; //以月为单位,比如2017-05-01
//setter  getter

}
public interface WordFreqDao{
public void saveWordFreq(WordFreq wf);
}

(2)任务二:保存词的联合频率
键值模型如下:
Word-word: frequency
定义VO类和接口:

public class BiWordFreq{
private String word1;
private String word2;
private Date date; //以月为单位,比如2017-05-01
private int freq;

//setter  getter

}
public interface BiWordFreqDao{
public void saveBiWordFreq(biWordFreq wf);
}

三、实验设备及软件环境:

Windows10专业版
IntelliJ IDEA 2020.3.2 (Ultimate Edition)
Java15

四、实验过程及结果截图:

BiWordFreq.java

package bean;

import java.util.Date;

public class BiWordFreq {
    private String word1;
    private String word2;
    private Date date;
    private int freq;

    public void setWord1(String word1) {
        this.word1 = word1;
    }

    public String getWord1() {
        return word1;
    }

    public void setWord2(String word2) {
        this.word2 = word2;
    }

    public String getWord2() {
        return word2;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Date getDate() {
        return date;
    }

    public void setFreq(int freq) {
        this.freq = freq;
    }

    public int getFreq() {
        return freq;
    }

}

WordFreq.java

package bean;

import java.util.Date;

public class WordFreq {
    private String word;
    private int freq;
    private Date date;

    public void setWord(String word) {
        this.word = word;
    }

    public String getWord() {
        return word;
    }

    public void setFreq(int freq) {
        this.freq = freq;
    }

    public int getFreq() {
        return freq;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Date getDate() {
        return date;
    }

}

FrequencyBolt.java

package bolt;

import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import bean.WordFreq;
import dao.WordFreqDAO;

public class FrequencyBolt extends BaseRichBolt {
    private Map<String, Integer> wordFreq;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        wordFreq = new HashMap<String, Integer>();
    }

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String word = input.getStringByField("word");
        if("".equals(word)) {
            System.out.println("词频:");
            System.out.println(wordFreq);
            System.out.println();

            Set<String> set = wordFreq.keySet();
            for(String s: set) {
                WordFreq wf = new WordFreq();

                wf.setWord(s);
                wf.setFreq(wordFreq.get(s));
                wf.setDate(new Date());

                new WordFreqDAO().saveWordFreq(wf);
            }
            return;
        }

        if(!wordFreq.containsKey(word)) {
            wordFreq.put(word, 1);
        }
        else {
            wordFreq.replace(word, wordFreq.get(word) + 1);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

ParticipleBolt.java

package bolt;

import java.io.IOException;
import java.io.StringReader;
import java.util.Map;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.wltea.analyzer.lucene.IKAnalyzer;
import org.apache.storm.tuple.Fields;

public  class ParticipleBolt extends BaseRichBolt {
    OutputCollector collector;

    public void execute(Tuple arg0) {
        String content = arg0.getStringByField("content");

        if("".equals(content)) {
            collector.emit(new Values(""));
            return;
        }

        try(Analyzer analyzer = new IKAnalyzer(true);) {
            TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
            CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
            stream.reset();
            while (stream.incrementToken()) {
                collector.emit(new Values(cta.toString()));
            }
        }
        catch(IOException e) {
            e.printStackTrace();
        }
    }

    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        collector = arg2;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

RelationBolt.java

package bolt;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.wltea.analyzer.lucene.IKAnalyzer;

import bean.BiWordFreq;
import dao.BiWordFreqDAO;

public class RelationBolt extends BaseRichBolt {
    private Map<String, Integer> wordFreq;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        wordFreq = new HashMap<String, Integer>();
    }

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String content = input.getStringByField("content");
        List<String> wordList = new ArrayList<String>();

        if("".equals(content)) {
            System.out.println("关联词频:");
            System.out.println(wordFreq);
            System.out.println();

            Set<String> set = wordFreq.keySet();
            for(String s: set) {
                BiWordFreq wf = new BiWordFreq();
                String[] words = s.split("-");

                wf.setWord1(words[0]);
                wf.setWord2(words[1]);
                wf.setFreq(wordFreq.get(s));
                wf.setDate(new Date());

                new BiWordFreqDAO().saveBiWordFreq(wf);
            }
            return;
        }

        try(Analyzer analyzer = new IKAnalyzer(true);) {
            TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
            CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
            stream.reset();
            while (stream.incrementToken()) {
                wordList.add(cta.toString());
            }
        }
        catch(IOException e) {
            e.printStackTrace();
        }

        for(int i = 0; i < wordList.size(); i++) {
            for(int j = i + 1; j < wordList.size(); j++) {
                String str1 = wordList.get(i);
                String str2 = wordList.get(j);
                String key = null;

                if(str1.compareTo(str2) == 0) {
                    continue;
                }
                else if(str1.compareTo(str2) > 0) {
                    key = str2 + "-" + str1;
                }
                else {
                    key = str1 + "-" + str2;
                }

                if(!wordFreq.containsKey(key)) {
                    wordFreq.put(key, 1);
                }
                else {
                    wordFreq.replace(key, wordFreq.get(key) + 1);
                }
            }
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

BiWordFreqDAO.java

package dao;

import bean.BiWordFreq;
import util.JedisUtil;

import redis.clients.jedis.Jedis;

public class BiWordFreqDAO {

    public void saveBiWordFreq(BiWordFreq wf) {
        try(Jedis jedis = JedisUtil.getConnection()) {
            String key = wf.getWord1() + "-" + wf.getWord2();
            int value = wf.getFreq();

            if(jedis.exists(key)) {
                jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
            }
            else {
                jedis.set(key, String.valueOf(value));
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }

}

WordFreqDAO.java

package dao;

import bean.WordFreq;
import util.JedisUtil;

import redis.clients.jedis.Jedis;

public class WordFreqDAO {

    public void saveWordFreq(WordFreq wf) {
        try(Jedis jedis = JedisUtil.getConnection()) {
            String key = wf.getWord();
            int value = wf.getFreq();

            if(jedis.exists(key)) {
                jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
            }
            else {
                jedis.set(key, String.valueOf(value));
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }

}

Spout.java

package spout;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.tuple.Fields;

import redis.clients.jedis.Jedis;

import util.JedisUtil;

public class Spout extends BaseRichSpout  {
    private SpoutOutputCollector collector;

    public void nextTuple() {
        String content = JedisUtil.getConnection().get("!");
        String[] strs = content.split("\n");

        for(String s: strs) {
            collector.emit(new Values(s));
        }
        collector.emit(new Values(""));
        try {
            Thread.sleep(Long.MAX_VALUE);
        }
        catch(InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        collector = arg2;
    }

    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("content"));
    }

}

zlh.java

package test;

import topology.TopologyFactory;
import util.JedisUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

public class zlh {

    public static void main(String[] args) throws MalformedURLException, IOException {

        Document doc = Jsoup.parse(new URL("https://blog.csdn.net/weixin_45267419"), 50000);
        Elements titles = doc.select("h4");
        StringBuffer sb = new StringBuffer();

        for(Element title: titles) {
            sb.append(title.html() + "\n");
        }
        sb.delete(sb.length() - 1, sb.length());

        JedisUtil.getConnection().set("!", sb.toString());

        StormTopology topology = TopologyFactory.factory();

        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        cluster.submitTopology("1814010130", config, topology);

    }
}

TopologyFactory.java

package topology;

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.generated.StormTopology;

import spout.*;
import bolt.*;

public class TopologyFactory {

    public static StormTopology factory() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("s1", new Spout());
        builder.setBolt("b1", new ParticipleBolt()).shuffleGrouping("s1");
        builder.setBolt("b2", new FrequencyBolt()).shuffleGrouping("b1");
        builder.setBolt("b3", new RelationBolt()).shuffleGrouping("s1");
        return builder.createTopology();
    }

}

JedisUtil.java

package util;

import redis.clients.jedis.Jedis;

public class JedisUtil {

    public static Jedis getConnection() {
//        return new Jedis("121.89.197.4", 6379);
        Jedis jedis= new Jedis("121.89.197.4", 6379);
        jedis.auth("root");
        return jedis;
    }

}

运行结果:

reids内容:

五、总结:

通过本次实验:理解大数据场景下数据处理和数据保存的结合,掌握Storm和Redis数据库的联合应用

实验成绩: 指导教师: 年 月 日

最后修改:2021 年 06 月 20 日
如果觉得我的文章对你有用,请随意赞赏