amazon sqs - can this code can be translated to stateful akka streams? -


enter image description herei'm trying listen sqs using akka streams , messages it's q using code snippet:

of course code snippet messages one-by-one (then ack it):

implicit val system = actorsystem() implicit val mat = actormaterializer() implicit val ec = executioncontext.fromexecutor(executors.newfixedthreadpool(iothreadpoolsize)) val awssqsclient: amazonsqsasync = amazonsqsasyncclientbuilder   .standard()   .withcredentials(new classpathpropertiesfilecredentialsprovider())   .withendpointconfiguration(new endpointconfiguration(sqsendpoint, configuration.regionname))   .build()  val future = sqssource(sqsendpoint)(awssqsclient)   .takewhile(_ => true)   .mapasync(parallelism = 2)(m => {     val msgbody = sqsmessage.deserializejson(m.getbody)     msgbody match {       case right(body) => val id = getid(body) //do stuff message may save state according id     }     future(m, ack())   })   .to(sqsacksink(sqsendpoint)(awssqsclient))   .run() 

my question is: can several messages, , save them example in stateful map latter use?

for example after receiving 5 messages (all of them saved (per state))

then if specific condition happens ack them all, , if not return queue (will happen anyway because visibility timeout)?

thanks.

could you're looking grouped (or groupedwithin) combinator. these allow batch messages , process them in groups. groupedwithin allows release batch after time in case hasn't yet reached determined size. docs reference here.

in subsequent check flow can perform logic need, , emit sequence in case want messages acked, or not emit them otherwise.

example:

  val yourcheck: flow[seq[messageactionpair], seq[messageactionpair], notused] = ???    val future = sqssource(sqsendpoint)(awssqsclient)     .takewhile(_ => true)     .mapasync(parallelism = 2){ ... }     .grouped(5)     .via(yourcheck)     .mapconcat(identity)     .to(sqsacksink(sqsendpoint)(awssqsclient))     .run() 

Comments