English 中文(简体)
利用Ry将数据从Petro转移到Ry数据集
原标题:Using Ray to transfer data from Spark to Ray datasets

<<>strong>update:

我认为,我最后理解这个问题。 由于数据碎片渗透到火花场会议,因此没有真正设立射线虫园。

因此,是否可以在轮船会议上提供光彩。

pre-update: I am currently running Spark on Databricks and set up Ray onto it (head node only). It seems to work, however, if I try to transfer the data from Spark to Ray datasets, I run into an issue:

TypeError                                 Traceback (most recent call last)
<command-2445755691838> in <module>
      5 memory_per_executor = "500M"
      6 # spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)
----> 7 dataset = ray.data.from_spark(df)

/databricks/python/lib/python3.7/site-packages/ray/data/read_api.py in from_spark(df, parallelism)
   1046     import raydp
   1047 
-> 1048     return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)
   1049 
   1050 

/databricks/python/lib/python3.7/site-packages/raydp/spark/dataset.py in spark_dataframe_to_ray_dataset(df, parallelism, _use_owner)
    176         if parallelism != num_part:
    177             df = df.repartition(parallelism)
--> 178     blocks, _ = _save_spark_df_to_object_store(df, False, _use_owner)
    179     return from_arrow_refs(blocks)
    180 

/databricks/python/lib/python3.7/site-packages/raydp/spark/dataset.py in _save_spark_df_to_object_store(df, use_batch, _use_owner)
    150     jvm = df.sql_ctx.sparkSession.sparkContext._jvm
    151     jdf = df._jdf
--> 152     object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf)
    153     obj_holder_name = df.sql_ctx.sparkSession.sparkContext.appName + RAYDP_OBJ_HOLDER_SUFFIX
    154     if _use_owner is True:

TypeError:  JavaPackage  object is not callable

(a) 《公约》第2条:

# loading the data
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format("csv") 
  .option("inferSchema", infer_schema) 
  .option("header", first_row_is_header) 
  .option("sep", delimiter) 
  .load("dbfs:/databricks-datasets/nyctaxi/tripdata/green")

import ray
import sys

# disable stdout
sys.stdout.fileno = lambda: False

# connect to ray cluster on a single instance
ray.init()
ray.cluster_resources()

import raydp

dataset = ray.data.from_spark(df)

我做了什么错误?

pyspark 3.0.1
ray 2.0.0
问题回答

现在,数据碎片支持Ry-on-S花园(而不是RayaDP)。 见Databricks Ray

RayDP是“S Park-on-Ray”,与制片时间不符。





相关问题
how to use phoenix5.0 with spark 3.0 preview

case "phoenix" =>{ outputStream.foreachRDD(rdd=>{ val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val ds=spark.createDataFrame(rdd,...

同一S3bucket使用多位证书

I'm using 2.1.1 with Hadoop 2.7.3 and I'm use data from different S3 sites in one管线。

热门标签