Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
775 views
in Technique[技术] by (71.8m points)

scala - Spark job with Async HTTP call

I build a RDD from a list of urls, and then try to fetch datas with some async http call. I need all the results before doing other calculs. Ideally, I need to make the http calls on differents nodes for scaling considerations.

I did something like this:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1", "url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r, 10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

This work, but Spark job never finish !

So I wonder what is are the best practices for dealing with Future using Spark (or Future[RDD]).

I think this use case looks pretty common, but didn't find any answer yet.

Best regards

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

this use case looks pretty common

Not really, because it simply doesn't work as you (probably) expect. Since each task operates on standard Scala Iterators these operations will be squashed together. It means that all operations will be blocking in practice. Assuming you have three URLs ["x", "y", "z"] you code will be executed in a following order:

Await.result(httpCall("x", 10.seconds))
Await.result(httpCall("y", 10.seconds))
Await.result(httpCall("z", 10.seconds))

You can easily reproduce the same behavior locally. If you want to execute your code asynchronously you should handle this explicitly using mapPartitions:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

but this is relatively tricky. There is no guarantee all data for a given partition fits into memory so you'll probably need some batching mechanism as well.

All of that being said I couldn't reproduce the problem you've described to is can be some configuration issue or a problem with httpCall itself.

On a side note allowing a single timeout to kill whole task doesn't look like a good idea.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...