哈尔滨理工大学

软件与微电子学院

实 验 报 告

(2020-2021第二学期)

课程名称:实时数据处理
班 级:软件18- 1 班
学 号:1814010130
姓 名:张立辉

哈尔滨理工大学软件与微电子学院


实验名称:实验1应用storm实现实时词频统计专 业软件工程
姓 名张立辉学 号1814010130班 级软件18-1

一、实验目的:

理解流式实时数据处理的基本思想,掌握应用storm实现实时数据处理的核心技术,以及可靠性保证技术。

二、实验内容:

任务一:实现博客文章标题的词频统计
要点:
(1)定义spout组件,读取爬取的博客文章标题
(2)定义第一个Bolt组件,对标题进行分词
(3)定义第二个Bolt组件,统计词频
任务二:实现博客文章标题的关联词频统计
要点:
(1)定义第三个Bolt组件,统计关联词频,也就是两个词同时出现的频率,比如 java+storm: 频率

三、实验设备及软件环境:

Windows10专业版
IntelliJ IDEA 2020.3.2 (Ultimate Edition)
Java15

四、实验过程及结果截图:

BiWordFreq.java

package bean;

import java.util.Date;

public class BiWordFreq {
    private String word1;
    private String word2;
    private Date date;
    private int freq;

    public void setWord1(String word1) {
        this.word1 = word1;
    }

    public String getWord1() {
        return word1;
    }

    public void setWord2(String word2) {
        this.word2 = word2;
    }

    public String getWord2() {
        return word2;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Date getDate() {
        return date;
    }

    public void setFreq(int freq) {
        this.freq = freq;
    }

    public int getFreq() {
        return freq;
    }

}

WordFreq.java

package bean;

import java.util.Date;

public class WordFreq {
    private String word;
    private int freq;
    private Date date;

    public void setWord(String word) {
        this.word = word;
    }

    public String getWord() {
        return word;
    }

    public void setFreq(int freq) {
        this.freq = freq;
    }

    public int getFreq() {
        return freq;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    public Date getDate() {
        return date;
    }

}

FrequencyBolt.java

package bolt;

import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import bean.WordFreq;
import dao.WordFreqDAO;

public class FrequencyBolt extends BaseRichBolt {
    private Map<String, Integer> wordFreq;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        wordFreq = new HashMap<String, Integer>();
    }

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String word = input.getStringByField("word");
        if("".equals(word)) {
            System.out.println("词频:");
            System.out.println(wordFreq);
            System.out.println();

            Set<String> set = wordFreq.keySet();
            for(String s: set) {
                WordFreq wf = new WordFreq();

                wf.setWord(s);
                wf.setFreq(wordFreq.get(s));
                wf.setDate(new Date());

                new WordFreqDAO().saveWordFreq(wf);
            }
            return;
        }

        if(!wordFreq.containsKey(word)) {
            wordFreq.put(word, 1);
        }
        else {
            wordFreq.replace(word, wordFreq.get(word) + 1);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

ParticipleBolt.java

package bolt;

import java.io.IOException;
import java.io.StringReader;
import java.util.Map;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.wltea.analyzer.lucene.IKAnalyzer;
import org.apache.storm.tuple.Fields;

public  class ParticipleBolt extends BaseRichBolt {
    OutputCollector collector;

    public void execute(Tuple arg0) {
        String content = arg0.getStringByField("content");

        if("".equals(content)) {
            collector.emit(new Values(""));
            return;
        }

        try(Analyzer analyzer = new IKAnalyzer(true);) {
            TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
            CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
            stream.reset();
            while (stream.incrementToken()) {
                collector.emit(new Values(cta.toString()));
            }
        }
        catch(IOException e) {
            e.printStackTrace();
        }
    }

    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        collector = arg2;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

RelationBolt.java

package bolt;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.wltea.analyzer.lucene.IKAnalyzer;

import bean.BiWordFreq;
import dao.BiWordFreqDAO;

public class RelationBolt extends BaseRichBolt {
    private Map<String, Integer> wordFreq;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        wordFreq = new HashMap<String, Integer>();
    }

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String content = input.getStringByField("content");
        List<String> wordList = new ArrayList<String>();

        if("".equals(content)) {
            System.out.println("关联词频:");
            System.out.println(wordFreq);
            System.out.println();

            Set<String> set = wordFreq.keySet();
            for(String s: set) {
                BiWordFreq wf = new BiWordFreq();
                String[] words = s.split("-");

                wf.setWord1(words[0]);
                wf.setWord2(words[1]);
                wf.setFreq(wordFreq.get(s));
                wf.setDate(new Date());

                new BiWordFreqDAO().saveBiWordFreq(wf);
            }
            return;
        }

        try(Analyzer analyzer = new IKAnalyzer(true);) {
            TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
            CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
            stream.reset();
            while (stream.incrementToken()) {
                wordList.add(cta.toString());
            }
        }
        catch(IOException e) {
            e.printStackTrace();
        }

        for(int i = 0; i < wordList.size(); i++) {
            for(int j = i + 1; j < wordList.size(); j++) {
                String str1 = wordList.get(i);
                String str2 = wordList.get(j);
                String key = null;

                if(str1.compareTo(str2) == 0) {
                    continue;
                }
                else if(str1.compareTo(str2) > 0) {
                    key = str2 + "-" + str1;
                }
                else {
                    key = str1 + "-" + str2;
                }

                if(!wordFreq.containsKey(key)) {
                    wordFreq.put(key, 1);
                }
                else {
                    wordFreq.replace(key, wordFreq.get(key) + 1);
                }
            }
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

BiWordFreqDAO.java

package dao;

import bean.BiWordFreq;
import util.JedisUtil;

import redis.clients.jedis.Jedis;

public class BiWordFreqDAO {

    public void saveBiWordFreq(BiWordFreq wf) {
        try(Jedis jedis = JedisUtil.getConnection()) {
            String key = wf.getWord1() + "-" + wf.getWord2();
            int value = wf.getFreq();

            if(jedis.exists(key)) {
                jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
            }
            else {
                jedis.set(key, String.valueOf(value));
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }

}

WordFreqDAO.java

package dao;

import bean.WordFreq;
import util.JedisUtil;

import redis.clients.jedis.Jedis;

public class WordFreqDAO {

    public void saveWordFreq(WordFreq wf) {
        try(Jedis jedis = JedisUtil.getConnection()) {
            String key = wf.getWord();
            int value = wf.getFreq();

            if(jedis.exists(key)) {
                jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
            }
            else {
                jedis.set(key, String.valueOf(value));
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }

}

Spout.java

package spout;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.tuple.Fields;

import redis.clients.jedis.Jedis;

import util.JedisUtil;

public class Spout extends BaseRichSpout  {
    private SpoutOutputCollector collector;

    public void nextTuple() {
        String content = JedisUtil.getConnection().get("!");
        String[] strs = content.split("\n");

        for(String s: strs) {
            collector.emit(new Values(s));
        }
        collector.emit(new Values(""));
        try {
            Thread.sleep(Long.MAX_VALUE);
        }
        catch(InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        collector = arg2;
    }

    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("content"));
    }

}

zlh.java

package test;

import topology.TopologyFactory;
import util.JedisUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

public class zlh {

    public static void main(String[] args) throws MalformedURLException, IOException {

        Document doc = Jsoup.parse(new URL("https://blog.csdn.net/weixin_45267419"), 50000);
        Elements titles = doc.select("h4");
        StringBuffer sb = new StringBuffer();

        for(Element title: titles) {
            sb.append(title.html() + "\n");
        }
        sb.delete(sb.length() - 1, sb.length());

        JedisUtil.getConnection().set("!", sb.toString());

        StormTopology topology = TopologyFactory.factory();

        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        cluster.submitTopology("1814010130", config, topology);

    }
}

TopologyFactory.java

package topology;

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.generated.StormTopology;

import spout.*;
import bolt.*;

public class TopologyFactory {

    public static StormTopology factory() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("s1", new Spout());
        builder.setBolt("b1", new ParticipleBolt()).shuffleGrouping("s1");
        builder.setBolt("b2", new FrequencyBolt()).shuffleGrouping("b1");
        builder.setBolt("b3", new RelationBolt()).shuffleGrouping("s1");
        return builder.createTopology();
    }

}

JedisUtil.java

package util;

import redis.clients.jedis.Jedis;

public class JedisUtil {

    public static Jedis getConnection() {
//        return new Jedis("121.89.197.4", 6379);
        Jedis jedis= new Jedis("121.89.197.4", 6379);
        jedis.auth("root");
        return jedis;
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com</groupId>
  <artifactId>sssjclshiyan111</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>sssjclshiyan111</name>
  <url>http://maven.apache.org</url>

  <repositories>
      <repository>
          <id>aliyunmaven</id>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      </repository>
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <lucene.version>4.5.1</lucene.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.11</version>
    </dependency>

    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.1</version>
<!--      <scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.8.2</version>
    </dependency>

    <!-- lucene核心包 -->
    <dependency>
        <groupId>org.apache.lucene</groupId>
        <artifactId>lucene-core</artifactId>
        <version>${lucene.version}</version>
    </dependency>
    <!--QueryParser 查询类-->
    <dependency>
        <groupId>org.apache.lucene</groupId>
        <artifactId>lucene-queryparser</artifactId>
        <version>${lucene.version}</version>
    </dependency>
    <!-- 分词器 -->
    <dependency>
        <groupId>org.apache.lucene</groupId>
        <artifactId>lucene-analyzers-common</artifactId>
        <version>${lucene.version}</version>
    </dependency>
    <!-- 高亮显示 -->
    <dependency>
        <groupId>org.apache.lucene</groupId>
        <artifactId>lucene-highlighter</artifactId>
        <version>${lucene.version}</version>
    </dependency>

    <!-- ikanalyzer 分词器 -->
    <dependency>
        <groupId>com.janeluo</groupId>
        <artifactId>ikanalyzer</artifactId>
        <version>2012_u6</version>
    </dependency>

    <dependency>
      <!-- jsoup HTML parser library @ https://jsoup.org/ -->
      <groupId>org.jsoup</groupId>
      <artifactId>jsoup</artifactId>
      <version>1.13.1</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <!--这部分可有可无,加上的话则直接生成可运行jar包 -->
                <!-- <archive>
                <manifest>
                <mainClass>com.thanks.hehe.App</mainClass>
                </manifest>
                </archive> -->
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
</build>
</project>

运行结果:

五、总结:

通过本次实验:理解流式实时数据处理的基本思想,掌握应用storm实现实时数据处理的核心技术,以及可靠性保证技术

实验成绩: 指导教师: 年 月 日

最后修改:2021 年 06 月 20 日
如果觉得我的文章对你有用,请随意赞赏