Per completare la fase di implementazione cominciata nel capitolo precedente, creiamo la classe Main
all'interno della quale vedremo in che modo Hadoop permette di utilizzare il job. A tal proposito definiamo una classe Main
che implementeremo come segue:
public class Main {
private static String input;
private static String output;
public static void main(String[] args) {
input = args[0];
output = args[1];
Configuration conf = new Configuration();
Job job;
try {
job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Innanzitutto è necessario recuperare i valori delle directory di input e di output che specificheremo al momento del lancio del job tramite la shell. Recuperiamo questi valori dall'oggetto args
. Successivamente possiamo istanziare un oggetto Job
il cui costruttore prevede i parametri di configurazione del job (incapsulati nell'oggetto Configuration
) e il nome del job.
Per indicare al job quali sono i formati dell'input e dell'output sono disponibili i metodi setInputFormatClass()
e setOutputFormatClass()
; dato che il formato dell'oggetto di input è di testo, possiamo utilizzare l'oggetto TextInputFormat
. Utilizziamo poi l'oggetto TextOutputFormat
per impostare il formato dell'output.
TextInputFormat
e TextOutputFormat
estendono rispettivamente FileInputFormat
e FileOutputFormat
che, a loro volta, estendono InputFormat
e OutputFormat
. Questi ultimi permettono di specificare il formato accettato in fase di input e quello da fornire in output, inoltre, permettono di effettuare un ragionamento sul numero di funzioni map/reduce in cui partizionare il job. Dimensionare opportunamente il job può radicalmente cambiare le prestazioni dell'elaborazione.
Quante funzioni map/reduce per un job? Come impostare il numero di map e di reduce? La scelta del numero di map dipende dal numero di blocchi DFS dell'input. La documentazione indica in 10-100map/node il valore per raggiungere il giusto livello di parallelismo. Per quanto riguarda il legame tra il numero di map e l'InputFormat, invece, di default la dimensione del blocco DFS viene considerata come un limite superiore in cui suddividere i dati di ingresso per le funzioni map. Ad esempio, se i dati di input sono 10TB, con blocchi DFS da 128MB, verranno automaticamente create 82.000 funzioni map. In alternativa, lo sviluppatore può specificare nel codice il numero di map da creare attraverso il metodo conf.setNumMapTasks(int num)
. In maniera analoga, possiamo impostare il numero di reduce tramite conf.setNumReduceTasks(int num)
.
Attraverso i metodi setOutputKeyClass()
e setOutputValueClass()
possiamo specificare rispettivamente il tipo di oggetto per la KeyOUT e il ValueOUT. Con i metodi setMapperClass()
e setReduceClass()
possiamo invece indicare al job le funzioni map e reduce. Infine, il framework fornisce gli oggetti FileInputFormat
e FileOutputFormat
per comunicare al job le directory di input e di output. Nel secondo caso possiamo notare l'utilizzo di setOutputPath()
per comunicare al job dove scrivere un unico output, mentre per il primo caso possiamo utilizzare un metodo per aggiungere una o più sorgenti di input, per tale motivo quindi viene predisposto il metodo addInputPath()
.
Esecuzione del job
Il job deve elaborare il contenuto di un file di input, quindi prima di lanciarlo è necessario caricare su HDFS tale file. L'istruzione per lanciare il job MapReduce è:
hadoop jar /directory/di/wordcount.jar html.it.hadoop.wordcount.Main /usr/hduser/wordcount/input /usr/hduser/wordcount/output
Il comando principale è jar
che accetta diversi parametri tra cui il jar che rappresenta il job MapReduce, la classe principale del job, la directory di input che contiene i file da elaborare e la directory di output che conterrà i file dove verrà salvato il risultato dell'elaborazione. Prima di lanciare tale comando bisognerà creare la directory di input ed effettuare il caricamento del file di input da elaborare nella stessa directory. Per quanto riguarda directory e file di output, invece, a questi penserà Hadoop.
hadoop fs -mkdir /user/hduser/wordcount/input
Il comando mkdir
ha come parametro l'elenco delle directory da creare su HDFS. Nel nostro caso esso è composto dalle directory di input e di output.
hadoop fs -copyFromLocal /directory/of/input.txt /user/hduser/wordcount/input
Con il comando copyFromLocal
, carichiamo il file input.txt
all'interno della directory di input precedentemente creata. Al fine di visualizzare il contenuto del file caricato su HDFS possiamo adottare l'opzione -cat
:
hadoop fs -cat /user/hduser/wordcount/input.txt
Eseguendo il job, visualizzeremo in shell informazioni sul progresso di elaborazione del job, il numero di job creati per portare a termine l'elaborazione ed eventuali eccezioni. Al termine dell'elaborazione possiamo verificare il risultato di questa visualizzando il contenuto del file di output. Innanzitutto individuiamo il file di output con il seguente comando:
hadoop fs -ls /user/hduser/wordcount/output
Lanciandolo visualizzeremo il contenuto della directory di output che contiene il file denominato part-r-00000
. Per accedere al contenuto del file eseguiamo il comando:
hadoop fs -cat /user/hduser/wordcount/output/part-r-00000
Esso ci permetterà di visualizzare l'elenco delle occorrenze lette nel file.
Conclusioni
Possiamo definire Hadoop un sisema "All-In-One", in quanto prevede tutti gli aspetti per lo sviluppo di sistemi per la data analysis su larga scala. Con esso vengono forniti un file system per ospitare qualsiasi tipo di dato, indipendentemente dal formato, un sistema per elaborare i dati garantendo scalabilità, elevata disponibilità e sistemi accessori per interrogare i dati con linguaggi di alto livello ad elevato potere descrittivo, permettendo di estrapolare le informazioni più disparate nonostante sorgenti di dati non strutturate come la maggior parte delle sorgenti odierne.