Spotlight: GraphStage emit and friends

In Mastering GraphStages Part I and Part II we have seen that push and pull are the primary methods to use when implementing a GraphStage. In this post you will learn that GraphStage also comes with other methods that can simplify the logic for some use cases.

As an example, let’s develop a stage that keeps track of the maximum element of a stream. It should consume elements from upstream as fast as possible and emit the maximum value downstreams when it has changed. When there is no request from downstream it should just accumulate current maximum and continue consuming elements from upstream.

A first stab may look like this:


import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler

class MaxStage extends GraphStage[FlowShape[Int, Int]] {
  val in: Inlet[Int] = Inlet("MaxStage.in")
  val out: Outlet[Int] = Outlet("MaxStage.out")
  override val shape: FlowShape[Int, Int] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var maxValue = Int.MinValue
      var maxPushed = Int.MinValue

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          maxValue = math.max(maxValue, grab(in))
          if (isAvailable(out) && maxValue > maxPushed)
            pushMaxValue()
          pull(in)
        }

        override def onUpstreamFinish(): Unit = {
          if (maxValue > maxPushed)
            pushMaxValue()
          completeStage()
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          if (maxValue > maxPushed)
            pushMaxValue()
          else if (!hasBeenPulled(in))
            pull(in)
        }
      })

      def pushMaxValue(): Unit = {
        maxPushed = maxValue
        push(out, maxPushed)
      }
    }
}

That looks rather straightforward, but there is a subtle bug in the code. Can you see it?

No worries, it’s difficult to spot.

How would we write a test for this stage? When we want full control of when elements are produced from upstream and consumed from downstream the TestSource and TestSink in the Akka Streams Teskit are handy.


      val (upstream, downstream) =
        TestSource.probe[Int]
          .via(new MaxStage3)
          .toMat(TestSink.probe)(Keep.both)
          .run()

      // send element 10 from upstream
      upstream.sendNext(10) 
      downstream.request(1)

      // and it is received by downstream
      downstream.expectNext(10)
      downstream.request(1)
      upstream.sendNext(9)
      upstream.sendNext(8)

      // no new max yet since 9 and 8 are < 10
      downstream.expectNoMsg(200.millis)
      upstream.sendNext(11)

       // new max emitted by the stage
      downstream.expectNext(11)
      upstream.sendNext(17)

      // end the stream
      upstream.sendComplete()

      // no request from downstream yet
      downstream.expectNoMsg(200.millis)
      downstream.request(1)

      // get the final element
      downstream.expectNext(17)
      downstream.expectComplete()

When running this test it fails with:

java.lang.IllegalArgumentException: requirement failed: Cannot push port (MaxStage.out) twice
	at scala.Predef$.require(Predef.scala:219)
	at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:436)
	at blog.MaxStage$$anon$1.blog$MaxStage$$anon$$pushMaxValue(MaxStage.scala:51)
	at blog.MaxStage$$anon$1$$anon$2.onUpstreamFinish(MaxStage.scala:35)

Ah, the bug is in onUpstreamFinish. When pushing out the final element I forgot that downstream might not have requested anything yet, i.e. I’m not allowed to push yet.

Let’s try to fix that by adding a boolean finishing flag and push out the final element in onPull instead.

        var finishing = false

        override def onUpstreamFinish(): Unit = {
          if (maxValue > maxPushed) {
            if (isAvailable(out)) {
              pushMaxValue()
              completeStage()
            } else {
              // push final value and complete stage in onPull
              finishing = true
            }
          } else {
            completeStage()
          }
        }

        override def onPull(): Unit = {
          if (maxValue > maxPushed) {
            pushMaxValue()
            if (finishing)
              completeStage()
          } else if (!hasBeenPulled(in))
            pull(in)
        }

That works. Test is passing, but the logic is rather difficult to follow.

Fortunately, GraphStage has some more utilities than the raw push and pull methods to handle such things. Let’s use emit instead. Then we don’t need to change the original onPull implementation and the new onUpstreamFinish looks like this:

        override def onUpstreamFinish(): Unit = {
          if (maxValue > maxPushed)
            emit(out, maxValue)
          completeStage()
        }

emit will push the element downstreams as soon as it is allowed to do so, i.e. when downstream has requested more elements. Also, note that it’s safe to call completeStage immediately after emit. It will automatically perform the completeStage action when the element has been pushed.

Nice!

There are other similar methods that can be good to be aware of, such as:

  • emitMultiple: emit several elements from an Iterable
  • read and readN: read one or more elements as they are pushed and react once the requested number of elements has been read

In some cases it is inconvenient and error prone to react on the regular state machine events with the signal based push/pull API. The API based on declarative sequencing of actions (e.g. emit and read) will greatly simplify some of those cases. The difference between the two APIs could be described as that the first one is signal driven from the outside, while the other is more active and drives its surroundings.

The complete source code for the example in this blog post is available in this gist.

-- Patrik Nordwall
October 21 2016



Past Blog posts
Jan 17 2017
Community Survey 2016 summary While it took us a while to go over the 637 replies replies from the 2016 community survey, now we’re ready to publish a small summary...
Dec 05 2016
We are using Aeron as the underlying transport in the new remoting implementation for Actor messages. The Aeron transport is based on UDP but it provides pretty much the same...
Dec 02 2016
The new remoting implementation for actor messages was released in Akka 2.4.11 two months ago. Artery is the code name for it. It’s a drop-in replacement to the old remoting...
Oct 21 2016
In Mastering GraphStages Part I and Part II we have seen that push and pull are the primary methods to use when implementing a GraphStage. In this post you will...
Sep 23 2016
In part I we have built an XML parser that reads from a streamed data source and emits streamed XML parsing events as its output. This is a bit low...
Sep 16 2016
In previous posts we have shown how to build basic Sinks and Sources, and how to integrate with existing APIs and handle backpressure in various ways. In this post we...
Sep 10 2016
Apache Kafka is the leading distributed messaging system, and Reactive Streams is an emerging standard for asynchronous stream processing. It seems natural to combine these two; that’s why SoftwareMill started...
Sep 05 2016
When working with Akka Streams, one can be assured that all of the data is going to be processed in bounded memory. The reason Akka Streams can guarantee this, is...