L'ambiente per sperimentare il MapReduce con R è pronto dalla scorsa lezione: quello che ci manca sono gli script ed i file di input. Tutti i file da analizzare verranno collocati in una cartella di HDFS, il cui contenuto completo sarà coinvolto nelle operazioni. Il file dati-wc.txt che abbiamo preparato noi, allegato a questa lezione, elenca i seguenti termini nella quantità indicata tra parentesi: cane (40), lumaca (16), cavallo (24), mela (18), banana (42) e fragola (54). Per memorizzare il file in HDFS, possiamo procedere così (nel caso utilizzassimo Ubuntu):
- creazione della cartella di input /tmp/data-input in HDFS:
$HADOOP_HOME/bin/hdfs dfs -mkdir -p /tmp/data-input
- verifica della corretta creazione della cartella in HDFS:
$HADOOP_HOME/bin/hdfs dfs -ls /tmp
- qualora si voglia svuotare una cartella del suo contenuto, sottocartelle comprese, si può eseguire:
$HADOOP_HOME/bin/hdfs dfs -rm -r percorso_cartella
A questo punto possiamo preparare gli script R. Lo script di map riceve sullo standard input tutto il contenuto dei file riga per riga. Seguendo il classico approccio al problema del word counting, l'operazione consisterà nello split di ogni riga parola per parola e nella produzione in output di tante righe quante sono le parole incontrate, seguito dal numero 1: ad una parola incontrata più volte, corrisponderanno più righe. Lo script seguente (map.R) è allegato a questa lezione.
#!/usr/bin/Rscript
# connessione con standard input
con <- file("stdin", open = "r")
# ciclo riga per riga sui file in input
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
# split di ogni riga e stampa in output della parola incontrata seguita dal numero 1
for (w in unlist(strsplit(line, "[[:space:]]+")))
cat(w, "\t1\n", sep="")
}
# chiusura dello standard input
close(con)
L'operazione di reduce riceve il risultato dello script di map e raggruppa i dati contando quante volte è stata incontrata nella precedente fase la medesima parola. In R, per effettuare il conteggio abbiamo usato le funzioni new.env
, assign
, exists
e get
. In pratica, con new.env
abbiamo preparato un ambiente in cui possiamo definire delle variabili, ognuna delle quali rappresenterà un termine incontrato. Nel ciclo while
, verifichiamo se è già stata definita una variabile corrispondente alla parola: in caso affermativo, incrementiamo il suo valore del numero che accompagna la parola nella riga
(ma sappiamo che, per come è fatto il map, sarà sempre 1) altrimenti la definiamo assegnandole il valore 1. Al termine, eseguiamo un ciclo su tutto il set di parole e produciamo in output tante righe quante sono le variabili definite, ognuna delle quali riportante un termine incontrato ed il numero delle sue occorrenze. Questo lo script reduce.R (anch'esso allegato):
#! /usr/bin/Rscript
#connessione a standard input
con <- file("stdin", open = "r")
# prepara ambiente per variabili
parole <- new.env(hash = TRUE)
#ciclo su ogni riga
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
# operazione di split: vettore componenti della riga
v <- unlist(strsplit(line, "[[:space:]]+"))
# parola incontrata
w<-v[1]
# occorrenze di w: sappiamo sarà sempre 1, in questo esempio
c<-v[2]
# aggiornamento conteggio
if (exists(w, envir=parole)) {
# se parola già incontrata aumento l'occorrenza
ultimo <- get(w, envir=parole);
assign(w, strtoi(ultimo)+strtoi(c), envir=parole)
}
# se non incontrata ancora, inizializzo variabile nell'ambiente
else assign(w, c, envir=parole)
}
# chiudo connessione con standard input
close(con)
# in output tutte le variabili definite
for (p in ls(parole, all = TRUE))
cat(p, "\t", get(p, envir = parole), "\n", sep = "")
Per avviare gli script con Hadoop Streaming, possiamo utilizzare il seguente comando:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar
-input /tmp/data-input -output /tmp/data-output -mapper map.R -reducer reduce.R
Analizziamo il comando in dettaglio:
$HADOOP_HOME/bin/hadoop
: il comando principale disponibile nella cartella bin dell'installazione;jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar
: attiviamo le Streaming API. Il percorso indicato è aggiornato, ma è comunque necessario verificarne l'aderenza con la propria installazione di Hadoop;-input /tmp/data-input
: la cartella contenente i file da analizzare. Si tratta di una directory su HDFS;-output /tmp/data-output
: cartella che conterrà i risultati dell'elaborazione, anche questa su HDFS;-mapper map.R
: script che svolge le operazioni di map;-reducer reduce.R
: script che esegue il reduce.
Al termine dell'esecuzione, dovremo verificare la presenza dei risultati nella cartella HDFS /tmp/output:
- visualizziamo il contenuto della cartella /tmp/data-output:
$HADOOP_HOME/bin/hdfs dfs -ls /tmp/data-output
Nel nostro caso, scopriamo che il file risultato è denominato part-00000; - visualizziamo i risultati:
$HADOOP_HOME/bin/hdfs dfs -cat /tmp/data-output/part-00000
Vedremo apparire nella console il risultato che ci aspettavamo, mostrato nella figura seguente:
Come ultima nota, sottolineiamo che un procedimento del genere con script separati permette di provarli uno alla volta per eseguire il debug al meglio. Inoltre, considerando che entrambi si alimentano da standard input, potremmo usarli da riga di comando su Linux con un'operazione di redirect di I/O come la seguente:
cat dati-wc.txt | ./map.R | ./reduce.R
Se gli ambienti sono configurati correttamente, vedremo in output il conteggio delle parole esatto. Come si può immaginare, però, ciò non è equivalente a farlo su Hadoop, dove potremo eseguire quantità di lavoro da Big Data con tutto il supporto del suo file system distribuito.