(Italiano) La Scala giusta per i Big Data
Negli ultimi anni, con il crescente fiorire dei dati “BIG”, si è andato sempre più affermando il linguaggio di programmazione Scala, il quale sembra proprio aver stabilmente conquistato il podio tra i linguaggi preferiti dai data scientists.
Il primo importante credito guadagnato da Scala sulla scena del coding risale a qualche anno fa, quando alcune parti di Twitter - il cinguettio social che si trova a gestire qualche milione di piccoli testi al giorno – scritte in C++, sono state completamente ricodificate in linguaggio Scala.
Il data scientist lavora abitualmente con grandi, anzi enormi set di dati, i quali devono essere caricati ma soprattutto trasformati, per le successive elaborazioni, in modo semplice e potente. E su questo terreno Scala non ha rivali: con poche righe di codice (rispetto a Java e Python) e con tempi di risposta eccellenti, si eseguono senza problemi operazioni e trasformazioni su ingenti volumi di informazioni.
Molti dei framework ad alte prestazioni utilizzate dai data scientists sono costruiti on top su Hadoop e di solito sono scritti in Scala o in Java. Scala offre, molto più di Java, un supporto ineguagliato nella gestione della concorrenza, la vera chiave per analizzare i big data attraverso l’elaborazione parallela. Scala è praticamente il linguaggio standard de facto per uno dei framework più in voga per le analisi su ingenti volumi di dati: Apache Spark.
Scopo di questo articolo, a cui torneremo a breve, è proprio quello di vedere Scala e Spark insieme in azione.
Quali sono le caratteristiche di Scala che lo differenziano ad esempio da Java?
Riassumiamo le principali:
- compatibilità: Scala è open source e compatibile; una volta compilato con il suo compilatore, il codice diventa bytecode per la JVM, che può usare librerie Java ed essere perfettamente integrato in applicazioni Java esistenti, senza referenziare classloader particolari, visto che non viene interpretato a run-time;
- scalabilità: le applicazioni moderne necessitano sempre più di essere scalabili e Scala (SCAlable LAnguage) permette, anzi favorisce, la scrittura di un codice che come può essere eseguito in un’applicazione console di un PC, può essere eseguito in un cluster; in Java, invece, non è semplice rendere un’applicazione o una libreria veramente scalabile, richiede uno sforzo di progettazione non indifferente;
- ottimizzato per lavorare coi dati: Scala è il linguaggio ideale per sviluppare applicazioni data-centric: ad esempio, una delle differenze più importante con Java riguarda le Collection: in Java, lavorare con le collezioni porta a codice verboso, spesso poco scalabile e non intuitivo, se paragonato alla potenza e alla coincisione di Scala.
L’obiettivo di Martin Odersky, il creatore di Scala, era quello di creare un linguaggio strutturalmente molto più agile di Java ma ottimizzato per lo sviluppo di applicazioni data-driven. Possiamo dire che l’obiettivo è stato raggiunto, vista anche la notevole diffusione di Scala. Tra l’altro, se volete avere Martin come insegnante, non dovete che registrarvi all'ottimo corso introduttivo a Scala che tiene su Coursera.
Scala adotta un paradigma di programmazione ibrido: è un linguaggio funzionale e orientato agli oggetti allo stesso tempo.
Iniziamo la nostra incursione operativa su Scala partendo dall'ottima GUI di sviluppo Scala Activator, creata dalla Lightbend, azienda estremamente focalizzata su Scala, che ci viene messa a disposizione per il download sul website ufficiale di Scala.
Scala Activator ha alcuni template già pronti all'uso e, dalla home page, possiamo attivare vari componenti dell’ecosistema, come Akka concurrency tool kit, Play web framework, Spark for big data handling.
Nel tab Tutorials, dopo aver cercato Spark e selezionato il template Spark Workshop, decidiamo di posizionarlo su disco (ad esempio, nel folder di Activator) e di creare il progetto: Activator verifica le dipendenze e, aprendo il progetto, ci viene mostrato un tutorial, ma possiamo anche vedere il codice, compilarlo, testarlo ed eseguirlo.
Questo Workshop spiega Spark e include il famoso esempio base per i big data, ossia WordCount. Ma, sempre nei template, troviamo anche l'evoluzione, WordCount2: ad esempio, possiamo richiedere il numero di volte in cui “the” è usato in tutte le opere di Shakespeare: si può vedere il codice sorgente ed eseguire il test (vedi Figura 2).
Creiamo ora un nuovo progetto con Activator (per utilizzare Spark) eseguendo i passi a seguire:
- creazione della directory di progetto
- creazione del file build.sbt con le minime direttive
- esecuzione del comando activator che crea il progetto
Il comando activator deve essere visibile nel vostro contesto $PATH.
$ mkdir $HOME/SimpleScalaProject $ cd $HOME/SimpleScalaProject $ curl -q https://gist.githubusercontent.com/amusarra/6a045fbda8b97d20eb5c9c75327a7fd0/raw/3b929e3fd742ed9d68a193c501cc66a0165d5457/build.sbt > build.sbt $ activator
Il contenuto del file di build (scaricato via curl da github per comodità) è quello mostrato a seguire:
scalaVersion := "2.11.8" name := "Simple Scala Project" libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "1.5.1"
L'esecuzione del comando activator crea la seguente struttura:
. ├── build.sbt ├── project │ └── target │ └── config-classes └── target
e una volta ricevuto il prompt di Activator (>), possiamo ad esempio compilare il progetto con il comando compile (oppure lanciare test). Se invece lanciassimo il comando run (ossia, tiriamo su la JVM) darà errore perché non ci sono ancora sorgenti/classi. Per avere maggiori informazioni sui comandi a disposizione lanciare il comando help.
~/SimpleScalaProject$ activator [info] Set current project to Simple Scala Project (in build file:/home/amusarra/SimpleScalaProject/) > help help Displays this help message or prints detailed help on requested commands (run 'help <command>'). completions Displays a list of completions for the given argument string (run 'completions <string>'). about Displays basic information about sbt and the build. tasks Lists the tasks defined for the current project. settings Lists the settings defined for the current project. reload (Re)loads the current project or changes to plugins project or returns from it. projects Lists the names of available projects or temporarily adds/removes extra builds to the session. project Displays the current project or changes to the provided `project`. set [every] <setting> Evaluates a Setting and applies it to the current project. session Manipulates session settings. For details, run 'help session'. inspect [uses|tree|definitions] <key> Prints the value for 'key', the defining scope, delegates, related definitions, and dependencies. <log-level> Sets the logging level to 'log-level'. Valid levels: debug, info, warn, error plugins Lists currently available plugins. ; <command> (; <command>)* Runs the provided semicolon-separated commands. ~ <command> Executes the specified command whenever source files change. last Displays output from a previous command or the output from a specific task. last-grep Shows lines from the last output for 'key' that match 'pattern'. export <tasks>+ Executes tasks and displays the equivalent command lines. exit Terminates the build. --<command> Schedules a command to run before other commands on startup. show <key> Displays the result of evaluating the setting or task associated with 'key'. all <task>+ Executes all of the specified tasks concurrently. More command help available using 'help <command>' for: !, +, ++, <, alias, append, apply, eval, iflast, onFailure, reboot, shell >
Lo strumento più utile per entrare da subito a stretto contatto con il linguaggio Scala è sicuramente il REPL (acronimo di Read, Evaluate and Print Loop): si tratta di una sessione shell interattiva per provare la logica Scala.
Per entrare nella console Scala (che funge da interprete del linguaggio), prima lanciamo activator dalla directory di progetto, e poi il comando console. Apparirà così il prompt (scala>), e potremo ad esempio scrivere:
scala> 1 to 10 res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> res0.map(number => number + 1) res1: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
ossia, abbiamo potuto direttamente sfruttare res0 dove Scala ha messo il resultset del primo comando e applicare una trasformazione che ora dà come risultato res1.
Con REPL possiamo effettuare operazioni di data science elementari come l'operazione di split di un data set di numeri in N subset modulo 3 ossia:
scala> 1 to 10 res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> res0.groupBy(number => number%3) res1: scala.collection.immutable.Map[Int,scala.collection.immutable.IndexedSeq[Int]] = Map(2 -> Vector(2, 5, 8), 1 -> Vector(1, 4, 7, 10), 0 -> Vector(3, 6, 9))
Con REPL possiamo anche effettuare operazioni Spark. Lavoriamo ad esempio con gli n-grams. Un n-gramma è una sotto sequenza di n elementi di una data sequenza. Esempio di 2-gram: “il treno”, esempio di 3-gram: “il treno è”
Vediamo un esempio di come usare Spark (eseguire, quindi, task in parallelo) per elaborare un ingente file di testo e, tramite un semplice codice Scala che editeremo direttamente nella console, avere in output una serie di 2-grams.
Non vi preoccupate se l’esempio non risulterà sufficientemente chiaro: lo scopo è solo quello di fornire una dimostrazione di come lavora Scala.
Nel nostro esempio, l’input sarà:
Ciao mi chiamo Alessandro
Voglio imparare Scala
e l’output consisterà in una serie di 2-grammi estrapolati a partire dalle righe in input:
Ciao mi, mi chiamo, chiamo Alessandro
Voglio imparare, imparare Scala
Creata una sessione console, prima di tutto importiamo le librerie Spark entrando in modalità paste (modalità che ci dà la possibilità di inserire più righe):
scala> :paste // Entering paste mode (ctrl-D to finish) import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.NGram // Exiting paste mode, now interpreting. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.NGram scala>
usciamo dalla modalità paste con CTRL+D. Creiamo, proseguendo nella stessa sessione console, un context Spark, dapprima creando il config:
scala> val conf=new SparkConf().setAppName("myproject").setMaster("local[2]") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@cabec99
e quindi il context, basato sul config creato:
scala> val sc=new SparkContext(conf) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/01/24 14:04:13 INFO SparkContext: Running Spark version 1.5.1 17/01/24 14:04:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/01/24 14:04:13 WARN Utils: Your hostname, amusarra-MacBookPro resolves to a loopback address: 127.0.1.1; using 192.168.1.90 instead (on interface enp2s0) 17/01/24 14:04:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/01/24 14:04:13 INFO SecurityManager: Changing view acls to: amusarra 17/01/24 14:04:13 INFO SecurityManager: Changing modify acls to: amusarra 17/01/24 14:04:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(amusarra); users with modify permissions: Set(amusarra) 17/01/24 14:04:14 INFO Slf4jLogger: Slf4jLogger started 17/01/24 14:04:14 INFO Remoting: Starting remoting 17/01/24 14:04:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.90:44426] 17/01/24 14:04:14 INFO Utils: Successfully started service 'sparkDriver' on port 44426. 17/01/24 14:04:14 INFO SparkEnv: Registering MapOutputTracker 17/01/24 14:04:14 INFO SparkEnv: Registering BlockManagerMaster 17/01/24 14:04:14 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-d410a4dd-f456-469c-bfab-c4d34ceb174f 17/01/24 14:04:14 INFO MemoryStore: MemoryStore started with capacity 497.3 MB 17/01/24 14:04:14 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a92eabd2-32f7-4778-8ab8-a4f79bdc49aa/httpd-aa005e94-31bb-486e-bdb3-c6f9688eee02 17/01/24 14:04:14 INFO HttpServer: Starting HTTP Server 17/01/24 14:04:15 INFO Utils: Successfully started service 'HTTP file server' on port 42774. 17/01/24 14:04:15 INFO SparkEnv: Registering OutputCommitCoordinator 17/01/24 14:04:15 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/01/24 14:04:15 INFO SparkUI: Started SparkUI at http://192.168.1.90:4040 17/01/24 14:04:15 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 17/01/24 14:04:15 INFO Executor: Starting executor ID driver on host localhost 17/01/24 14:04:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37579. 17/01/24 14:04:15 INFO NettyBlockTransferService: Server created on 37579 17/01/24 14:04:15 INFO BlockManagerMaster: Trying to register BlockManager 17/01/24 14:04:15 INFO BlockManagerMasterEndpoint: Registering block manager localhost:37579 with 497.3 MB RAM, BlockManagerId(driver, localhost, 37579) 17/01/24 14:04:15 INFO BlockManagerMaster: Registered BlockManager sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4aedbe59
e poi anche un SQL context, basato sul context Spark creato poc'anzi:
scala> val sqlContext=new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@73c21e05
Siamo pronti per inserire i dati di input, o meglio la loro rappresentazione dataframes [1]:
scala> :paste // Entering paste mode (ctrl-D to finish) val wordDataFrame = sqlContext.createDataFrame(Seq( (0, Array("Ciao","mi","chiamo","Alessandro")), (1, Array("Voglio","imparare","Scala")) )).toDF("label","words") // Exiting paste mode, now interpreting. wordDataFrame: org.apache.spark.sql.DataFrame = [label: int, words: array<string>]
Ora creiamo un n-gram per il nostro esempio:
scala> val ngram=new NGram().setInputCol("words").setOutputCol("ngrams") ngram: org.apache.spark.ml.feature.NGram = ngram_0175e9e93564
a partire da questo , creiamo l’n-gram dataframe, trasformando il data frame wordDataFrame:
scala> val ngramDataFrame=ngram.transform(wordDataFrame) ngramDataFrame: org.apache.spark.sql.DataFrame = [label: int, words: array<string>, ngrams: array<string>]
e infine trasformiamo l’n-gram dataframe ottenendo l’output desiderato:
scala> ngramDataFrame.take(2).map(line => line.getAs[Stream[String]]("ngrams").toList).foreach(println) List(Ciao mi, mi chiamo, chiamo Alessandro) List(Voglio imparare, imparare Scala)
osserviamo che con take(2) prendiamo le prime due linee del data frame, con map mappiamo tali linee in due stream.
Chiudiamo questa breve introduzione a Scala dando un’occhiata a uno dei costrutti alla base del linguaggio: le classi. Cos'è una classe Scala?
E’ la denominazione di un type:
- impersona lo stato di una istanza della classe (es: classe/type customer, dove gli stati sono name, age, phone number);
- rappresenta il comportamento di come quello stato può essere trasformato (come posso cambiare il phone number);
- non è operativa fino a quando non viene istanziata attraverso il suo costruttore new (istanziando una classe con new inizio a rappresentare un customer con una persona specifica);
- possono esistere istanza multiple di una classe (più customers nel sistema).
Vediamo una semplicissima classe Scala:
scala> class Hello defined class Hello scala> new Hello() res4: Hello = Hello@2f1ebc37 scala> res4.toString() res5: String = Hello@2f1ebc37
res5.String=Hello@2f1ebc37 e viene restituita la locazione all’interno del heap Java (ossia lo spazio di memoria nella JVM)
Ogni classe ha automaticamente un primary constructor, che definisce la firma di come creare una istanza, il corpo della classe è l’implementazione del constructor:
scala> class Hello { | println("Hello") | } defined class Hello scala> new Hello() Hello res6: Hello = Hello@9bcd46b
Mi fermo qui. Lascio alla vostra curiosità l'approfondimento delle caratteristiche di Scala, ricordando che, oltre alle fonti ufficiali già citate, è disponibile un eccellente corso online a cura della Lightbend, nonchè ottimi articoli introduttivi in italiano di MokaByte.
[1] Un DataFrame Spark è una collezione distribuita di dati organizzati in colonne che permette operazioni di filtro, di raggruppamento, o di aggregazione, e che può essere usato con Spark SQL. Il DataFrame è concettualmente equivalente a una tabella in un database relazionale (o a un dataframe Python). I DataFrames possono essere costruiti a partire da svariate tipologie di fonti come: files strutturati, tabelle Hive, database esterno, RDD esistenti.