There is no option that allows you to set a minimum value for the number of messages received from Kafka. The option maxOffsetsPerTrigger
allows you to set a maximum of messages.
If you want your micro-batch to process more messages at once, you may consider increasing your trigger interval.
Also (referring to the link you have provided), this is also not possible to set in Kafka itself. You could set a minimum amount of fetched bytes but not a minimum amount of message numbers.
Note, that you can set all Kafka options through the readStream in Structured Streaming through the prefix kafka.
as explained in the section Kafka Specific Configurations:
"Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")."
That way, you could also play around with the Consumer Configuration kafka.fetch.min.bytes
. However, testing this with Spark 3.0.1 on a loval Kafka 2.5.0 installation it does not have any impact. When adding the configuration kafka.fetch.max.wait.ms
the fetch timing in my tests did changed but not in a predictable manner (at least to me).
Looking at the source code of Spark's KafkaDataConsumer it looks like the fetch
does not directly account for any min/max bytes compared to the pure KafkaConsumer.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…