Storm(3) - Calculating Term Importance with Trident

Creating a URL stream using a Twitter filter

Start by creating the project directory and standard Maven folder structure ( directory-layout.html).

1. Create the POM as per the Creating a "Hello World" topology recipe in Chapter 1, Setting Up Your Development Environment, updating the <artifactId> and <name> tag values to tfidf-topology, and include the following dependencies:

2. Import the project into Eclipse after generating the Eclipse project files:

mvn eclipse:eclipse

3. Create a new spout called TwitterSpout that extends from BaseRichSpout, and add the following member-level variables:

public class TwitterSpout extends BaseRichSpout {
    LinkedBlockingQueue<Status> queue = null;
    TwitterStream twitterStream;
    String[] trackTerms;
    long maxQueueDepth;
    SpoutOutputCollector collector;

4. In the open method of the spout, initialize the blocking queue and create a Twitter stream listener:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    queue = new LinkedBlockingQueue<Status>(1000);

    StatusListener listener = new StatusListener() {
        public void onStatus(Status status) {
            if(queue.size() < maxQueueDepth){
                 LOG.trace("TWEET Received: " + status);
            else {
              LOG.error("Queue is now full, the following message is dropped: "+status);

    twitterStream = new TwitterStreamFactory().getInstance();

    FilterQuery filter = new FilterQuery();

5. Then create the Twitter stream and filter

6. You then need to emit the tweet into the topology.

public void nextTuple() {

    Status ret = queue.poll();

    if(ret == null) {
        try {
        catch (InterruptedException e) {}
    else {
        collector.emit(new Values(ret));

7. Next, you must create a bolt to publish the tuple persistently to another topology within the same cluster. Create a BaseRichBolt class called PublishURLBolt that doesn't declare any fields, and provide the following execute method:

public class PublishURLBolt extends BaseRichBolt {

    public void execute(Tuple input) {
        Status ret = (Status) input.getValue(0);
        URLEntity[] urls = ret.getURLEntities();

        for(int i = 0; i < urls.length; i++) {
              jedis.rpush("url", urls[i].getURL().trim());

8. Finally, you will need to read the URL into a stream in the Trident topology. To do this, create another spout called TweetURLSpout:

public class TweetURLSpout {

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("url"));

    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        host = conf.get(Conf.REDIS_HOST_KEY).toString();
        port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
        this.collector = spoutOutputCollector;


    private void connectToRedis() {
        jedis = new Jedis(host, port);

    public void nextTuple() {
        String url = jedis.rpop("url");
        if(url==null) {
            try {
            catch (InterruptedException e) {}
        else {
            collector.emit(new Values(url));

Deriving a clean stream of terms from the documents

This recipe consumes the URL stream, downloading the document content and deriving a clean stream of terms that are suitable for later analysis. 

A clean term is defined as a word that:
> Is not a stop word
> Is a valid dictionary word
> Is not a number or URL
> Is a lemma

A lemma is the canonical form of a word; for example, run, runs, ran, and running are forms of the same lexeme with "run" as the lemma. Lexeme, in this context, refers to the set of all the forms that have the same meaning, and lemma refers to the particular form that is chosen by convention to represent the lexeme.

The lemma is important for this recipe because it enables us to group terms that have the same meaning. Where their frequency of occurrence is important, this grouping is important.

1. Create a class named DocumentFetchFunction, that extends from storm.trident.operation.BaseFunction, and provide the following implementation for the execute method:

public class DocumentFetchFunction extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        String url = tuple.getStringByField("url");
        try {
            Parser parser = new AutoDetectParser();
            Metadata metadata = new Metadata();
            ParseContext parseContext = new ParseContext();
            URL urlObject = new URL(url);
            ContentHandler handler = new BodyContentHandler(10 * 1024 * 1024);

            parser.parse((InputStream)urlObject.getContent(), handler, metadata, parseContext);
            String[] mimeDetails = metadata.get("Content-Type").split(";");
            if ((mimeDetails.length > 0) && (mimeTypes.contains(mimeDetails[0]))) {
               collector.emit(new Values(handler.toString(), url.trim(), "twitter"));
        catch (Exception e) {

2. Next we need to tokenize the document, create another class that extends from BaseFunction and call it DocumentTokenizer. Provide the following execute implementation:

public class DocumentTokenizer extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        String documentContents = tuple.getStringByField(TfidfTopologyFields.DOCUMENT);
        TokenStream ts = null;

        try {
            ts = new StopFilter(Version.LUCENE_30,
                  new StandardTokenizer(Version.LUCENE_30, new StringReader(documentContents)),

             CharTermAttribute termAtt = ts.getAttribute(CharTermAttribute.class);
            while(ts.incrementToken()) {
                  String lemma = MorphaStemmer.stemToken(termAtt.toString());
                  lemma = lemma.trim().replaceAll("\n","").replaceAll("\r", "");
                collector.emit(new Values(lemma));

        catch (IOException e) {
        finally {
              if(ts != null) {
                try {
                catch (IOException e) {}

3. We then need to filter out all the invalid terms that may be emitted by this function. To do this, we need to implement another class that extends BaseFunction called TermFilter. The execute method of this function will simply call a checking function to optionally emit the received tuple. The checking function isKeep() should perform the following validations:

public class TermFilter extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        //call isKeep() method

    private boolean isKeep() {
        if(stem == null) {
              return false;

        if(stem.equals("")) {
              return false;

        if(filterTerms.contains(stem)) {
              return false;

        //we don't want integers
        try {
              return false;
        catch(Exception e) {}

        //or floating point numbers
        try {
              return false;
        catch(Exception e) {}

        try {
              return spellchecker.exist(stem);
        catch (Exception e) {
              return false;

4. The dictionary needs to be initialized during the prepare method for this function:

public void prepare(Map conf, TridentOperationContext context){
    super.prepare(conf, context);

    File dir = new File(System.getProperty("user.home") + "/dictionaries");
    Directory directory;

    try {
        directory =;
        spellchecker = new SpellChecker(directory);
        StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_36);
        IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_36, analyzer);
        URL dictionaryFile = TermFilter.class.getResource("/dictionaries/fulldictionary00.txt");

        spellchecker.indexDictionary(new PlainTextDictionary(new File(dictionaryFile.toURI())), config, true);
    catch (Exception e) {
        throw new RuntimeException(e);

5. Download the dictionary file from JavaCodeGeeks/LuceneSuggestionsTutorial/ and place it in the src/main/resources/dictionaries folder of your project structure.

6. Finally, you need to create the actual topology, or at least partially for the moment. Create a class named TermTopology that provides a main(String[] args) method and creates a local mode cluster:

public class TermTopology {

    public static void main(String[] args) {
           Config conf = new Config();
        conf.put(Conf.REDIS_HOST_KEY, "localhost");
        conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT);

        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("tfidf", conf, buildTopology(drpc));

7. Then build the appropriate portion of the topology:

public static StormTopology buildTopology(LocalDRPC drpc) {

    TridentTopology topology = new TridentTopology();
    FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("url"), 1, new Values(""), new Values(""));

    Stream documentStream = topology
        .newStream("tweetSpout", testSpout)
        .each(new Fields("url"), new DocumentFetchFunction(mimeTypes), new Fields("document", "documentId", "source"));

    Stream termStream = documentStream
        .each(new Fields("document"), new DocumentTokenizer(), new Fields("dirtyTerm"))
        .each(new Fields("dirtyTerm"), new TermFilter(), new Fields("term")).project(new Fields("term","documentId","source"));

