Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.1k views
in Technique[技术] by (71.8m points)

apache spark - Implicit Encoder for TypedDataset and Type Bounds in Scala

My objective is to create a MyDataFrame class that will know how to fetch data at a given path, but I want to provide type-safety. I'm having some trouble using a frameless.TypedDataset with type bounds on remote data. For example

sealed trait Schema
final case class TableA(id: String) extends Schema
final case class TableB(id: String) extends Schema

class MyDataFrame[T <: Schema](path: String, implicit val spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
} 

But I keep getting could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]. I know that TypedDataset.create needs an Injection for this to work. But I'm not sure how I would write this for a generic T. I thought the compiler would be able to deduce that since all subtypes of Schema are case classes that it would work.

Anybody ever run into this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

All implicit parameters should be in the last parameter list and this parameter list should be separate from non-implicit ones.

If you try to compile

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

you'll see error

Error:(11, 35) could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

So let's just add corresponding implicit parameter

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

we'll have error

Error:(11, 64) could not find implicit value for parameter as: frameless.ops.As[org.apache.spark.sql.Row,T]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

So let's add one more implicit parameter

import frameless.ops.As
import frameless.{TypedDataset, TypedEncoder}
import org.apache.spark.sql.{Row, SparkSession}

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row], as: As[Row, T]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

or with kind-projector

class MyDataFrame[T <: Schema : As[Row, ?]](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

You can create custom type class

  trait Helper[T] {
    implicit def te: TypedEncoder[Row]
    implicit def as: As[Row, T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te0: TypedEncoder[Row], as0: As[Row, T]): Helper[T] = new Helper[T] {
      override implicit def te: TypedEncoder[Row] = te0
      override implicit def as: As[Row, T] = as0
    }
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    val h = implicitly[Helper[T]]
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

or

  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

or

  trait Helper[T] {
    def create(dataFrame: DataFrame): TypedDataset[T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te: TypedEncoder[Row], as: As[Row, T]): Helper[T] =
      (dataFrame: DataFrame) => TypedDataset.create(dataFrame).as[T]
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    def read = implicitly[Helper[T]].create(spark.read.parquet(path))
  }

or

  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    def read = h.create(spark.read.parquet(path))
  }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...