Skip to content

Learning Scala : Implementing infinite streams using Scala constructors

Notifications You must be signed in to change notification settings

enimiste/infinite-streams-impl-in-scala

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kata : Implementing infinite streams in Scala

This a learning project. This implementation isn't performant nor optimal. Some operations throw StackOverflowError exceptions with big streams (ex: size, forEach).

API :

To use the XStream stream api you should call methods defined by the XStreams object.

Stream creation operations :

object XStreams {
  /**
   * Returns an empty stream
   *
   * @param T
   * @return
   */
  def empty[T]: XFiniteStream[T]

  /**
   * Returns a finite stream that has one element
   *
   * @param elem
   * @tparam T
   * @return
   */
  def once[T](elem: T): XFiniteStream[T]

  /**
   * Returns an infinite stream that always returns the same element
   *
   * @param elem
   * @tparam T
   * @return
   */
  def fixed[T](elem: T): XStream[T]

  /**
   * Returns an infinite stream that construct the next element from the last one using the op function
   *
   * @param elem
   * @param op
   * @tparam T
   * @return
   */
  def iterate[T](elem: T, op: T => T): XStream[T]

  /**
   * Returns an infinite stream that construct the element using the supplier
   *
   * @param supplier
   * @tparam T
   * @return
   */
  def generate[T](supplier: () => T): XStream[T]

  /**
   * Returns an infinite stream built from a finite sequence using a circular index
   *
   * @param elems
   * @tparam T
   * @return
   */
  def circular[T](elems: Seq[T]): XStream[T]

  /**
   * Returns a finite stream backed by the supplied sequence
   *
   * @param items
   * @tparam T
   * @return
   */
  def finite[T](items: Seq[T]): XFiniteStream[T]

  /** Returns a stream backed by the given iterator.
   *
   * @param iterator
   * @tparam T
   * @return
   */
  def fromIterator[T](iterator: Iterator[T]): XStream[T]
}

Intermediate/Transformation operations :

/**
 * XStream public API
 *
 * @tparam T
 */
trait XStream[T] {

  /**
   * Returns a new finite stream having at most nbr elements
   *
   * @param nbr
   * @return
   */
  def take(nbr: Int): XFiniteStream[T]

  /**
   * Returns a new stream having element that satisfy the predicate
   *
   * @param predicate
   * @return
   */
  def takeWhile(predicate: T => Boolean): XStream[T]

  /**
   * Returns a new stream with the nbr elements of the original stream skipped
   *
   * @param nbr
   * @return
   */
  def skip(nbr: Int): XStream[T]

  /**
   * Returns a new stream starting from the first element that don't match the predicate
   *
   * @param predicate
   * @return
   */
  def skipWhile(predicate: T => Boolean): XStream[T]

  /**
   * Returns a new stream with only elements that satisfy the given predicate
   *
   * @param predicate
   * @return
   */
  def filter(predicate: T => Boolean): XStream[T]

  /**
   * Returns a new stream where each element is transformed version of the original one (applying the given mapping function)
   *
   * @param mapping
   * @tparam B
   * @return
   */
  def map[B](mapping: T => B): XStream[B]

  /** Returns a new stream that merges all the sub stream into one
   *
   * @param asIterableOne
   * @tparam B
   * @return
   */
  def flatten[B](implicit asIterableOne: T => IterableOnce[B]): XStream[B]

  /** Returns a new stream that merges all the sub stream into one, after
   * applying the mapping function to each one of the elements
   *
   * @param mapping
   * @tparam B
   * @return
   */
  def flatMap[B, C](mapping: T => B)(implicit asIterableOne: B => IterableOnce[C]): XStream[C]

  /**
   * Returns a new stream that concatenate this stream with another one
   *
   * @param other
   * @return
   */
  def concat(other: => XStream[T]): XStream[T]

  /**
   * Returns a new stream where each element is a tuple of the elements of this stream and another.
   * The length of this new stream is the length of the smallest one.
   *
   * @param other
   * @tparam B
   * @return
   */
  def zip[B](other: XStream[B]): XStream[(T, B)]

  /**
   * Returns a new stream where each element is a finite stream of windowSize elements from this element
   *
   * @param windowSize
   * @return
   */
  def window(windowSize: Int): XStream[XFiniteStream[T]]

  /**
   * Returns a new stream that execute the given consumer on each element
   *
   * @param consumer
   * @return
   */
  def peek(consumer: T => Unit): XStream[T]
}

Terminal operations

/**
 * Trait that defines operation that are safe to execute only (has also a meaning) on a finite stream
 * <p>NB : A finite stream extends the API of the infinite stream.</p>
 *
 * @tparam T
 */
trait FiniteXStream[T] extends XStream[T] {

  /**
   * Left reducer
   *
   * @param initial
   * @param combinator
   * @tparam B
   * @return
   */
  def foldLeft[B](initial: B, combinator: (B, T) => B): B

  /**
   * Right reducer
   * @param initial
   * @param combinator
   * @tparam B
   * @return
   */
  def foldRight[B](initial: B, combinator: (B, T) => B): B

  /**
   * Collect stream data into a bag (List, Set, ...)
   *
   * @param bag
   * @param collector
   * @tparam B
   * @return
   */
  def collect[B[_]](bag: B[T], collector: (B[T], T) => B[T]): B[T]

  /**
   * List implementation of the collect
   *
   * @return
   */
  def toList: List[T]

  /**
   * Returns a Map of the grouped data
   *
   * @param keyGenerator
   * @param initial
   * @param combinator
   * @tparam K
   * @tparam B
   * @return
   */
  def groupBy[K, B](keyGenerator: T => K, initial: B, combinator: (B, T) => B): Map[K, B]

  /**
   * List implementation of the groupBy
   *
   * @param keyGenerator
   * @tparam K
   * @return
   */
  def groupBy[K](keyGenerator: T => K): Map[K, List[T]]

  /**
   * Execute a given consumer over the elements of the stream
   *
   * @param consumer
   */
  def forEach(consumer: T => Unit): Unit

  /**
   * Returns an iterator over the elements of  the stream
   *
   * @return
   */
  def iterator: Iterator[T]

  /**
   * Returns the element count of this stream
   *
   * @return
   */
  def size: Int

  /**
   * Returns the biggest element of this stream using the given comparator.
   * <ul>
   *   <li>None if the stream is empty.</li>
   *   <li>The biggest even there is duplicates.</li>
   * </ul>
   *
   * @param comparator
   * @return
   */
  def max(comparator: Comparator[T]): Option[T]

  /**
   * Returns the smallest element of this stream using the given comparator.
   * <ul>
   * <li>None if the stream is empty.</li>
   * <li>The smallest even there is duplicates.</li>
   * </ul>
   *
   * @param comparator
   * @return
   */
  def min(comparator: Comparator[T]): Option[T]

  /**
   * Returns a new finite stream that traverse elements in the reverse order
   *
   * @return
   */
  def reversed: FiniteXStream[T]

  /**
   * Returns a new finite stream that concatenate this stream with another one.
   *
   *<p>This overload of the concat method, force the return type to be FiniteXStream if the other stream is also finite</p>
   *
   * @param other
   * @return
   */
  def concat(other: FiniteXStream[T]): FiniteXStream[T]

  /**
   * Returns True if all the elements of this stream matches the given predicate. Otherwise it returns False
   * <p>It doesn't traverse all the stream in all cases. It stops on the first element that don't match the predicate</p>
   *
   * @param predicate
   * @return
   */
  def matchAll(predicate: T => Boolean): Boolean

  /**
   * Returns True if any of the elements of this stream matches the given predicate. Otherwise it returns False.
   *
   * <p>It doesn't traverse all the stream in all the cases. It stops on the first element that don't match the predicate</p>
   *
   * @param predicate
   * @return
   */
  def matchAny(predicate: T => Boolean): Boolean
}

Examples :

Imports :

import org.example.xstreams.api.*
import org.example.xstreams.impl.XStreams.*

Create an infinite stream representing all integer numbers :

val stream: XStream[Int] = iterate(0, x => x + 1)

Filter only even numbers then print the first 10 numbers :

stream
      .filter(n => n % 2 == 0)//Infinite stream
      .take(5)//Finite stream
      .forEach(println)
0
2
4
6
8

Flatmap a stream of streams :

println(
  stream
    .skip(1)
    .flatMap(n => once("A" * n))
    .take(5)
    .toList
)
List(A, AA, AAA, AAAA, AAAAA)

Flatten example :

case class Bag(item1: String, item2: String, item3: String)
val bagStream = finite(
  Seq(Bag("A", "B", "C"), Bag("E", "F", "G"), Bag("H", "I", "J"))
)
//Needed by the flatten function, because the Bag class doesn't implements IterableOne[+A] trait
implicit val bagAsIterableOnce: Bag => IterableOnce[String] = bag =>
  new IterableOnce[String] {
    override def iterator: Iterator[String] =
      List(bag.item1, bag.item2, bag.item3).iterator
  }
bagStream
  .flatten
  .take(100)
  .forEach(println)
A
B
C
E
F
G
H
I
J

More examples :

Open this repository on any editor supporting scala (VS Code/Metal, Intellij Idea, GitPod, ...).

Then run the App.scala main class.

About

Learning Scala : Implementing infinite streams using Scala constructors

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages