scala - Akka streams flattening Flows via -


flows can connected via like:

def atob: flow[a, b, notused] = { ??? }   def btoc: flow[b, c, notused] = { ??? }   def atoc: flow[a, c, notused] = { atob.via(btoc) }   

i equivalent of flatmap:

def atosomeb: flow[a, some[b], notused] = { ??? }   def atosomec: flow[a, some[c], notused] = { atosomeb.flatvia(btoc) } 

is there built-in way flatvia? seems common need things option unwrapping , error flattening.

it depends if interested in keeping nones around, or if want throw them away.

as typed flow flow[a, some[c], notused] seems not interested in nones @ all. means can filter them out collect, e.g.

def atosomec: flow[a, c, notused] = { atosomeb.collect{case some(x) ⇒ x}.via(btoc) } 

if, otherwise, need track nones (or lefts if you're dealing eithers), you'll need write "lifting" stage yourself. can written generically. example, can written function takes flow flow[i, o, m] , returns flow flow[either[e, i], either[e, o], m]. because requires fan-out , fan-in stages, usage of graphdsl required.

  def lifteither[i, o, e, m](f: flow[i, o, m]): graph[flowshape[either[e, i], either[e, o]], m] =     flow.fromgraph(graphdsl.create(f) { implicit builder: graphdsl.builder[m] => f =>        val fin      = builder.add(flow[either[e, i]])       val p        = builder.add(partition[either[e, i]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))       val merge    = builder.add(merge[either[e, o]](2))       val toright  = builder.add(flow[o].map(right(_)))                   p.out(0).collect{case left(x) ⇒ left(x)}             ~> merge       fin.out ~> p.in                  p.out(1).collect{case(right(x)) ⇒ x} ~> f ~> toright ~> merge        new flowshape(fin.in, merge.out)     }) 

this can used per below

  def atosomeb: flow[a, either[throwable, b], notused] = ???   def atosomec: flow[a, either[throwable, c], notused] = atosomeb.via(lifteither(btoc)) 

note options can converted eithers leverage same helper function.


Comments