Une bibliothèque de streaming avec une superpuissance: FS2 et programmation fonctionnelle

Scala a une bibliothèque de streaming très spéciale appelée FS2 (Functional Streams for Scala). Cette bibliothèque incarne tous les avantages de la programmation fonctionnelle (FP). En comprenant ses objectifs de conception, vous serez exposé aux idées fondamentales qui rendent la PF si attrayante.

FS2 a un type central: Stream[Effect,Output]

Vous pourriez obtenir de ce type qu'il s'agit d'un Streamet qu'il émet des valeurs de type Output.

La question évidente ici est qu'est-ce que c'est Effect? Quel est le lien entre Effectet Output? Et quels avantages FS2 a-t-il par rapport aux autres bibliothèques de streaming?

Aperçu

Je commencerai par passer en revue les problèmes résolus par FS2. Ensuite, je compare Listet Streamavec plusieurs exemples de code. Après cela, je me concentrerai sur la façon de l'utiliser Streamavec une base de données ou tout autre IO. C'est là que FS2 brille et où le Effecttype est utilisé. Une fois que vous aurez compris ce que Effectc'est, les avantages de la programmation fonctionnelle devraient vous être évidents.

À la fin de cet article, vous obtiendrez les réponses aux questions suivantes:

  • Quels problèmes puis-je résoudre avec FS2?
  • Que puis-je faire avec Streamcela Listne peut pas?
  • Comment puis-je alimenter des données d'une API / d'un fichier / d'une base de données vers Stream?
  • Quel est ce Effecttype et comment se rapporte-t-il à la programmation fonctionnelle?

Remarque: Le code est en Scala et doit être compréhensible même sans connaissance préalable de la syntaxe.

Quels problèmes puis-je résoudre avec FS2?

  1. Streaming I / O: chargement incrémentiel d'ensembles de données volumineux qui ne rentrent pas dans la mémoire et fonctionnement sur eux sans faire exploser votre tas.
  2. Flux de contrôle (non couvert): Déplacer les données d'un / plusieurs DB / fichiers / API vers d'autres d'une manière déclarative agréable.
  3. Concurrence (non couverte): exécutez différents flux en parallèle et faites-les communiquer ensemble. Par exemple, charger des données à partir de plusieurs fichiers et les traiter simultanément plutôt que séquentiellement. Vous pouvez faire des choses avancées ici. Les flux peuvent communiquer ensemble pendant la phase de traitement et pas seulement à la fin.

List contre Stream

Listest la structure de données la plus connue et la plus utilisée. Pour avoir une idée de la différence avec un FS2 Stream, nous allons passer en revue quelques cas d'utilisation. Nous verrons comment Streamrésoudre les problèmes qui Listne le peuvent pas.

Vos données sont trop volumineuses et ne rentrent pas dans la mémoire

Disons que vous avez un très gros fichier (40 Go) fahrenheit.txt. Le fichier a une température sur chaque ligne et vous souhaitez le convertir celsius.txt.

Charger un gros fichier avec List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listéchoue lamentablement car bien sûr, le fichier est trop gros pour tenir en mémoire. Si vous êtes curieux, vous pouvez consulter la solution complète en utilisant Streamici - mais faites-le plus tard, lisez la suite :)

Quand List ne fera pas… Stream à la rescousse!

Disons que j'ai réussi à lire mon fichier et que je veux le réécrire. Je souhaite conserver la structure des lignes. J'ai besoin d'insérer un caractère de nouvelle ligne \naprès chaque température.

Je peux utiliser le interspersecombinateur pour faire ça

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

Une autre belle est zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Il regroupe des éléments consécutifs, très utile si vous souhaitez supprimer les doublons consécutifs.

Ce ne sont que quelques-uns parmi d'autres très utiles, voici la liste complète.

Évidemment, Streamon peut faire beaucoup de choses qui Listne le peuvent pas, mais la meilleure fonctionnalité arrive dans la section suivante, il s'agit de savoir comment utiliser Streamdans le monde réel avec des bases de données / fichiers / API ...

Comment puis-je alimenter des données d'une API / d'un fichier / d'une base de données vers Stream?

Disons simplement pour l'instant que c'est notre programme

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Qu'est-ce Pureque cela veut dire? Voici le scaladoc du code source:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

Ça veut dire pas d'effets, ok…, mais qu'est-ce qu'un effet? et plus précisément quel est l'effet de notre programme Stream(1,2,3)?

Ce programme n'a littéralement aucun effet sur le monde. Son seul effet sera de faire fonctionner votre CPU et de consommer de l'énergie !! Cela n'affecte pas le monde qui vous entoure.

En affectant le monde, je veux dire qu'il consomme toute ressource significative comme un fichier, une base de données ou produit quelque chose comme un fichier, télécharger des données quelque part, écrire sur votre terminal, etc.

Comment transformer un Pureflux en quelque chose d'utile?

Disons que je veux charger les identifiants utilisateur à partir d'une base de données, cette fonction me est donnée, elle appelle la base de données et renvoie le userId sous la forme d'un fichier Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Il renvoie un Futurequi indique que cet appel est asynchrone et que la valeur sera disponible à un moment donné dans le futur. Il encapsule la valeur renvoyée par la base de données.

J'ai ce Pureflux.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

Comment obtenir un Streamdes identifiants?

L'approche naïve serait d'utiliser la mapfonction, elle devrait exécuter la fonction pour chaque valeur dans le Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Je suis encore revenu un Pure! J'ai donné Streamune fonction qui affecte le monde et j'ai quand même un Pure, pas cool ... Cela aurait été chouette si FS2 aurait détecté automatiquement que la loadUserIdByNamefonction a un effet sur le monde et m'a renvoyé quelque chose qui n'est PAS Puremais ça le fait ne fonctionne pas comme ça. Vous devez utiliser un combinateur spécial au lieu de map: vous devez utiliser evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Pas plus Pure! nous avons obtenu à la Futureplace, yay! Qu'est-ce qui vient juste de se passer?

Ça a pris:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

Et changé les types de flux en

  • Stream[Future, Long]

It separated the Future and isolated it! The left side that was the Effect type parameter is now the concrete Future type.

Neat trick, but how does it help me?

You just witnessed true separation of concerns. You can continue to operate on the stream with all the nice List like combinators and you don't have to worry about if the DB is down, slow or all the stuff that is related to the network (effect) concerns.

It all works until I want to use toList to get the values back

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

What???!!! I could swear that I used toList before and it worked, how can it say that toList is not a member of fs2.Stream[Future,String] any more? It is as if this function was removed the moment I started using an effect-ful stream, that's impressive! But how do I get my values back?

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

First we use compile to tell the Stream to combine all the effects into one, effectively it folds all the calls to loadUserIdByName into one big Future. This is needed by the framework, and it will become apparent why this step is needed soon.

Now toList should work

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

What?! the compiler is still complaining. That’s because Future is not a good Effect type — it breaks the philosophy of separation of concerns as explained in the next very important section.

IMPORTANT: The ONE thing to take away from this post

A key point here, is that the DB has not been called at this point. Nothing happened really, the full program does not produce anything.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Separating program description from evaluation

Yes it might be surprising but the major theme in FP is separating the

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] separates the concerns of What and How by letting you work only with the values and not worrying about how to get them (loading from the db).
  • Separating program description from evaluation is a key aspect of FP.
  • All the programs you write with Stream will do nothing until you use unsafeRunSync. Before that your code is effectively pure.
  • IO[Long] is an effect type that tells you: you will get Long values from IO (could be a file, the network, the console ...). It's a description and not a wrapper!r
  • Future does not abide by this philosophy and thus is not compatible with FS2, you have to use IO type instead.

FS2 videos

  • Hands on screencast by Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Talk by Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA