i'm trying use spark structured streaming count number of items kafka for each time window code below:
import java.text.simpledateformat import java.util.date import org.apache.spark.sql.foreachwriter import org.apache.spark.sql.sparksession import org.apache.spark.sql.functions.window object counter extends app { val dateformatter = new simpledateformat("hh:mm:ss") val spark = ... import spark.implicits._ val df = spark.readstream .format("kafka") .option("kafka.bootstrap.servers", ...) .option("subscribe", ...) .load() val windowduration = "5 minutes" val counts = df .select("value").as[array[byte]] .map(decodetimestampfromkafka).todf("timestamp") .select($"timestamp" cast "timestamp") .withwatermark("timestamp", windowduration) .groupby(window($"timestamp", windowduration, "1 minute")) .count() .as[((long, long), long)] val writer = new foreachwriter[((long, long), long)] { var partitionid: long = _ var version: long = _ def open(partitionid: long, version: long): boolean = { this.partitionid = partitionid this.version = version true } def process(record: ((long, long), long)): unit = { val ((start, end), docs) = record val startdate = dateformatter.format(new date(start)) val enddate = dateformatter.format(new date(end)) val = dateformatter.format(new date) println(s"$now:$this|$partitionid|$version: ($startdate, $enddate) $docs") } def close(errorornull: throwable): unit = {} } val query = counts .repartition(1) .writestream .outputmode("complete") .foreach(writer) .start() query.awaittermination() def decodetimestampfromkafka(bytes: array[byte]): long = ... }
i expected that, once each minute (the slide duration), output a single record (since aggregation key window) items count last 5 minutes (the window duration). however, outputs several records 2-3 times per minute, in sample:
... 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:20, 22:43:20) 383 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:18, 22:43:19) 435 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:42:33, 22:42:34) 395 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:14, 22:43:14) 435 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:09, 22:43:09) 437 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:19, 22:43:19) 411 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:07, 22:43:07) 400 22:44:34|counter$$anon$1@6eb68dd7|0|8: (22:43:17, 22:43:17) 392 22:44:44|counter$$anon$1@5b70120f|0|9: (22:43:37, 22:43:38) 420 22:44:44|counter$$anon$1@5b70120f|0|9: (22:43:25, 22:43:25) 395 22:44:44|counter$$anon$1@5b70120f|0|9: (22:43:22, 22:43:22) 416 22:44:44|counter$$anon$1@5b70120f|0|9: (22:43:00, 22:43:00) 438 22:44:44|counter$$anon$1@5b70120f|0|9: (22:43:41, 22:43:41) 426 22:44:44|counter$$anon$1@5b70120f|0|9: (22:44:13, 22:44:13) 132 22:44:44|counter$$anon$1@5b70120f|0|9: (22:44:02, 22:44:02) 128 22:44:44|counter$$anon$1@5b70120f|0|9: (22:44:09, 22:44:09) 120 ...
changing output mode "append" seems change behavior, still far expected.
what wrong assumptions on way should work? given code above, how should sample output interpreted or used?
you allowing late events of 5 minutes counted , update windows calculated (withwatermark) see handling late data , watermarking in spark guide
Comments
Post a Comment