Pertanyaan Memanggil fungsi Java / Scala dari tugas


Latar Belakang

Pertanyaan awal saya di sini adalah Mengapa menggunakan DecisionTreeModel.predict di dalam fungsi peta memunculkan pengecualian? dan berhubungan dengan Bagaimana cara menghasilkan tupel (label asli, label prediksi) pada Spark dengan MLlib?

Saat kami menggunakan Scala API cara yang disarankan untuk mendapatkan prediksi RDD[LabeledPoint] menggunakan DecisionTreeModel adalah dengan hanya memetakan RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Sayangnya pendekatan serupa di PySpark tidak berfungsi dengan baik:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Pengecualian: Tampaknya Anda mencoba untuk merujuk SparkContext dari variabel siaran, tindakan, atau transformasi. SparkContext hanya dapat digunakan pada driver, bukan dalam kode yang dijalankan pada pekerja. Untuk informasi lebih lanjut, lihat SPARK-5063.

Daripada itu dokumentasi resmi merekomendasikan sesuatu seperti ini:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

Jadi apa yang terjadi disini? Tidak ada variabel siaran di sini dan API Scala mendefinisikan predict sebagai berikut:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

jadi setidaknya pada pandangan pertama memanggil dari tindakan atau transformasi bukanlah masalah karena prediksi tampaknya menjadi operasi lokal.

Penjelasan

Setelah beberapa penggalian, saya menemukan bahwa sumber masalahnya adalah a JavaModelWrapper.call metode dipanggil dari DecisionTreeModel.predict. Saya t mengakses  SparkContext yang diperlukan untuk memanggil fungsi Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pertanyaan

Dalam kasus DecisionTreeModel.predict ada solusi yang disarankan dan semua kode yang diperlukan sudah menjadi bagian dari Scala API tetapi adakah cara yang elegan untuk menangani masalah seperti ini secara umum?

Hanya solusi yang dapat saya pikirkan saat ini yang agak berat:

  • mendorong semuanya ke JVM baik dengan memperluas kelas Spark melalui Konversi Implisit atau menambahkan beberapa jenis pembungkus
  • menggunakan gateway Py4j secara langsung

32
2017-07-28 18:54


asal


Jawaban:


Komunikasi menggunakan gateway Py4J standar sama sekali tidak mungkin. Untuk memahami mengapa kita harus melihat diagram berikut dari dokumen PySpark Internal [1]:

enter image description here

Karena gateway Py4J berjalan pada driver tidak dapat diakses oleh penerjemah Python yang berkomunikasi dengan pekerja JVM melalui soket (Lihat misalnya PythonRDD / rdd.py).

Secara teoritis adalah mungkin untuk membuat gateway Py4J terpisah untuk setiap pekerja tetapi dalam prakteknya tidak mungkin berguna. Mengabaikan masalah seperti keandalan Py4J tidak hanya dirancang untuk melakukan tugas-tugas intensif data.

Apakah ada solusi?

  1. Menggunakan Spark API Sumber Data SQL untuk membungkus kode JVM.

    Pro: Didukung, tingkat tinggi, tidak memerlukan akses ke API PySpark internal

    Cons: Relatif verbose dan tidak terdokumentasi dengan baik, terbatas sebagian besar pada data input

  2. Beroperasi pada DataFrames menggunakan UDFs Scala.

    Pro: Mudah dilaksanakan (lihat Spark: Bagaimana cara memetakan Python dengan Scala atau Java User Defined Functions?), tidak ada konversi data antara Python dan Scala jika data sudah disimpan dalam DataFrame, akses minimal ke Py4J

    Cons: Membutuhkan akses ke gateway Py4J dan metode internal, terbatas pada Spark SQL, sulit untuk didebug, tidak didukung

  3. Membuat antarmuka Scala tingkat tinggi dengan cara yang sama bagaimana hal itu dilakukan di MLlib.

    Pro: Fleksibel, kemampuan untuk mengeksekusi kode kompleks acak. Ini bisa don baik langsung di RDD (lihat misalnya Pembungkus model MLlib) atau dengan DataFrames (Lihat Cara menggunakan kelas Scala di dalam Pyspark). Solusi yang terakhir tampaknya lebih ramah karena semua rincian layanan sudah ditangani oleh API yang ada.

    Cons: Konversi data tingkat rendah yang diperlukan, sama seperti UDFs memerlukan akses ke Py4J dan API internal, tidak didukung

    Beberapa contoh dasar dapat ditemukan di Transforming PySpark RDD dengan Scala

  4. Menggunakan alat manajemen alur kerja eksternal untuk beralih antara pekerjaan Python dan Scala / Java dan meneruskan data ke DFS.

    Pro: Mudah diterapkan, perubahan minimal pada kode itu sendiri

    Cons: Biaya membaca / menulis data (Alluxio?)

  5. Menggunakan bersama SQLContext (lihat misalnya Apache Zeppelin atau Livy) untuk meneruskan data antara bahasa tamu menggunakan tabel sementara terdaftar.

    Pro: Cocok untuk analisis interaktif

    Cons: Tidak begitu banyak untuk pekerjaan batch (Zeppelin) atau mungkin memerlukan orkestrasi tambahan (Livy)


  1. Joshua Rosen. (2014, 04 Agustus) Internal PySpark. Diterima dari https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

36
2017-12-22 09:14