i'm trying listen sqs using akka streams , messages it's q using code snippet:
of course code snippet messages one-by-one (then ack it):
implicit val system = actorsystem() implicit val mat = actormaterializer() implicit val ec = executioncontext.fromexecutor(executors.newfixedthreadpool(iothreadpoolsize)) val awssqsclient: amazonsqsasync = amazonsqsasyncclientbuilder .standard() .withcredentials(new classpathpropertiesfilecredentialsprovider()) .withendpointconfiguration(new endpointconfiguration(sqsendpoint, configuration.regionname)) .build() val future = sqssource(sqsendpoint)(awssqsclient) .takewhile(_ => true) .mapasync(parallelism = 2)(m => { val msgbody = sqsmessage.deserializejson(m.getbody) msgbody match { case right(body) => val id = getid(body) //do stuff message may save state according id } future(m, ack()) }) .to(sqsacksink(sqsendpoint)(awssqsclient)) .run()
my question is: can several messages, , save them example in stateful map latter use?
for example after receiving 5 messages (all of them saved (per state))
then if specific condition happens ack them all, , if not return queue (will happen anyway because visibility timeout)?
thanks.
could you're looking grouped
(or groupedwithin
) combinator. these allow batch messages , process them in groups. groupedwithin
allows release batch after time in case hasn't yet reached determined size. docs reference here.
in subsequent check flow can perform logic need, , emit sequence in case want messages acked, or not emit them otherwise.
example:
val yourcheck: flow[seq[messageactionpair], seq[messageactionpair], notused] = ??? val future = sqssource(sqsendpoint)(awssqsclient) .takewhile(_ => true) .mapasync(parallelism = 2){ ... } .grouped(5) .via(yourcheck) .mapconcat(identity) .to(sqsacksink(sqsendpoint)(awssqsclient)) .run()
Comments
Post a Comment