# Scala : Class 3

## build.sbt

``````scalaVersion := "2.11.0"

libraryDependencies ++= Seq("org.specs2" %% "specs2" % "2.3.12" % "test",
"org.scalacheck" %% "scalacheck" % "1.11.4" % "test",
"com.netflix.rxjava" % "rxjava-scala" % "0.18.3")

resolvers += "releases"  at "http://oss.sonatype.org/content/repositories/releases"

scalacOptions ++= Seq("-deprecation", "-feature")``````

## course3.scala

``````package fr.enst.plnc

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import rx.lang.scala._

object Course3 extends App {

// Naive recursive implementation of the Fibonacci sequence.
def fibo(n: Int): Int =
if (n < 2)
1
else
fibo(n-2) + fibo(n-1)

// Let's cook a future through a promise.
def futureFibo_old(n: Int): Future[Int] = {
val p = Promise[Int]()
override def run(): Unit = {
val r = fibo(n)
p.success(r)
}
}.start()
p.future
}

// A simpler way of writing a future.
def futureFibo2(n: Int): Future[Int] = Future { fibo(n) }

// This implementation will spawn many threads, as recursive computations
// will be started in parallel.
def futureFibo(n: Int): Future[Int] = {
if (n < 2)
Future { 1 }
else {
val f1 = futureFibo(n-1)
val f2 = futureFibo(n-2)
for (n1 <- f1;
n2 <- f2)
yield (n1 + n2)
}
}

/* Some commented out tests that we ran throughout the course.

println("Before")
for (r <- futureFibo(40); s <- futureFibo(5))
println(r + s)
futureFibo(40) foreach (r => futureFibo(5) foreach (s => println(s + r)))
val f40 = futureFibo(10)
val f5 = futureFibo(5)
for (r40 <- f40; r5 <- f5) println(s"Result: \${r40 + r5}")
println("After")

for (i <- 1 to 10)
println(i)

(1 to 10) foreach (i => println(i))

val ii = for (i <- 1 to 10) yield (i * i)
val ii2 = (1 to 10) map (i => i * i)
println(ii2)

val jj = for (i <- 1 to 5; j <- 1 to 3) yield (i, j)
val jj2 = (1 to 5) flatMap (i => (1 to 3) map (j => (i, j)))
println(jj2)

for (i <- Some(3)) yield println(i)
for (i <- None) println(i)

println("Foobar: " + Await.result(futureFibo(35), 2.seconds))

*/

// Infinite Fibonacci sequence by the way of an observable.
val fibos = Observable[Int] { (subscriber: Subscriber[Int]) =>
subscriber.onNext(1)
subscriber.onNext(1)
var n1 = 1
var n2 = 1
while (!subscriber.isUnsubscribed) {
val n = n1 + n2
subscriber.onNext(n)
n1 = n2
n2 = n
// We could stop the infinite sequence using either onCompleted
// or onError.
// if (n > 100)
//   subscriber.onCompleted()
// (or) subscriber.onError(new RuntimeException("too big a value"))
}
}

fibos.dropWhile(_ < 100).takeWhile(_ < 1000).subscribe(

// We can zip observables. And subscribe to them giving functions to call
// for, respectively, onNext, onError, and onNext.
val l = Observable.from(1 to 10)
fibos.take(10).zip(l).timestamp.subscribe(
{ n => println(n) },
{ t: Throwable => println(s"Exception: \$t") },
{ () => println("Observable is terminated") })

// Here, double is just an observable and will not emit anything until it
// gets subscribed to.
val double = for (n <- fibos) yield (n*2)
double.take(5).subscribe(println(_))

// We can cache values for other subscribers too.
val fibosss = fibos.take(100).cache
println("Here")
fibosss.drop(90).first.subscribe(println(_))
fibosss.drop(90).first.subscribe(println(_))

// Or publish them (older values won't be rerun).
val fiboss = fibos.publish

// Observable.interval(1.second).timestamp.subscribe(println(_))

// val x = fibos.take(5).toBlockingObservable.toList
val x = fibos.take(5).toSeq.toBlockingObservable.single
// val x = Observable.items("a", "b").toBlockingObservable.single
// val x = Observable.empty.toBlockingObservable.single
println(x)

// "scan" allows to emit intermediate results.
fibos.take(5).scan((_: Int)+(_: Int)).subscribe(println(_))

// We can group observables by a key, given a function to compute the key.
// fibos.take(5): Observable[Int]
// fibos.take(5).groupBy(_%2) = Observable[(Int, Observable[Int])]
fibos.take(5).groupBy(_%2)

}``````