Per implementare un TOPOLOGY dobbiamo semplicemente creare una classe nel package topologies
ed implementare il metodo main()
come segue:
public class CounterTOPOLOGY {
public final static String WORD_READER_SPOUT_ID = "word-reader-SPOUT";
public final static String WORD_NORMALIZER_BOLT_ID = "word-normalizer-BOLT";
public final static String WORD_COUNTER_BOLT_ID = "word-counter-BOLT";
public final static String COUNTER_TOPOLOGY_ID = "counter-TOPOLOGY";
public final static String WORDS_FILE_KEY = "wordsFile";
public static void main(String[] args) {
//Prima parte - Definizione del TOPOLOGY
TOPOLOGYBuilder builder = new TOPOLOGYBuilder();
builder.setSPOUT(WORD_READER_SPOUT_ID, new WordReader());
builder.setBOLT(WORD_NORMALIZER_BOLT_ID, new WordNormalizer()).
shuffleGrouping(WORD_READER_SPOUT_ID);
builder.setBOLT(WORD_COUNTER_BOLT_ID, new WordCounter()).
shuffleGrouping(WORD_NORMALIZER_BOLT_ID);
//Seconda parte - Configurazione Storm Cluster
Config conf = new Config();
conf.setDebug(true);
conf.put(WORDS_FILE_KEY, args[0]);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTOPOLOGY(COUNTER_TOPOLOGY_ID, conf, builder.createTOPOLOGY());
}
}
Nella prima parte implementiamo la creazione di un TOPOLOGY che avviene istanziando l'oggetto TOPOLOGYBuilder
. L'associazione degli SPOUTS e dei BOLTS al TOPOLOGY è resa possibile rispettivamente dai metodi setSPOUT()
e setBOLT()
dell'oggetto TOPOLOGYBuilder
. Entrambi i metodi ricevono in ingresso due parametri: un id
con il quale identifichiamo univocamente l'entità passata e l'oggetto vero e proprio. Con il metodo shuffleGrouping()
implementiamo la tecnica dello Shuffle Grouping ed in ingresso forniamo l'id del componente da cui riceve le tuple.
Nella seconda parte, configuriamo lo storm cluster per un utilizzo in modalità Local Mode
. Quando eseguiremo l'applicazione Java con Eclipse verrà invocato il metodo submitTOPOLOGY()
dell'oggetto LocalCluster
che invia il TOPOLOGY creato allo storm cluster opportunamente configurato. Eseguendo l'applicazione e osservando la console di Eclipse, vedremo comparire una sorta di log di ciò che sta succedendo.
Leggendo la console potremmo verificare l'esecuzione di tutti i componenti dello storm cluster (fisici e logici), quando vengono emesse le tuple, fino ad arrivare al risultato dell'elaborazione finale.
Il file utilizzato per la demo contiene le seguenti parole:
- cane
- gatto
- html
- java
- cane
- java
- bo
- pippo
- pluto
- pluto
- gatto
- paperino
- iphone
- android
Conclusioni
L'analisi delle proprietà di Storm, permette di incentivare ulteriormente la scelta di integrarlo nei propri sistemi:
- Semplice da programmare: sviluppare, da zero, un sistema capace di effettuare elaborazioni in real-time è un lavoro piuttosto complesso. Storm riduce drasticamente questa complessità.
- Supporta diversi linguaggi di programmazione.
- Fault-Tolerance: se un worker è down, riassegna automaticamente un task ad un altro worker
- Scalabilità: tutto ciò di cui si ha bisogno per garantire la scalabilità è aggiungere altre macchine al cluser; sarà Storm a riassegnare i tasks alle nuove macchine appena queste diventano disponibili.
- Affidabilità: offre la garanzia che tutti i messaggi siano elaborati almeno una volta. Se ci sono errori, possono essere elaborati anche più di una volta, ma non ci saranno perdite di messaggi
- Veloce: la velocità è stato uno dei fattori chiave che ha guidato la progettazione di Storm
Storm fornisce astrazioni abbastanza semplici, nascondendo un'enorme complessità: un TOPOLOGY è costituito da SPOUTS e BOLTS che comunicano tra loro per elaborare un flusso di dati. Si specifica il grado di parallelismo desiderato per ogni componente e si trasmette tutto il codice a Nimbus che garantirà l'esecuzione del codice, per sempre.