English 中文(简体)
正在将 RDD( 弹坑) 保存和装入到泡菜文件中, 正在改变 Sparse Vectors 的顺序
原标题:Saving and Loading RDD (pyspark) to pickle file is changing order of SparseVectors
I trained tf-idf on a pre-tokenized (unigram tokenizer) dataset that I converted from list[list(token1, token2, token3, ...)] to an RDD using pyspark s HashingTF and IDF implementations. I tried to save the RDD with tf-idf values, but when I saved the output to a file and then loaded it from the file. The loaded file outputs an RDD that is the original saved RDD but with the order of the SparseVectors now seemingly with a random one as the first in the RDD and then assigned proper order after that. All the parts of my code that matter: from pyspark.mllib.feature import HashingTF, IDF from pyspark.sql import SparkSession spark = SparkSession.builder.appName( tf-idf ).getOrCreate() sc = spark.sparkContext data = load_from_disk("pre cleaned data") tokenizer = Tokenizer.from_file("pre trained tokenizer") tokenized_data = tokenizer.encode_batch(data["content"]) tokenized_data = [doc.tokens for doc in tokenized_data] #converting tokenized data to ONLY list of each document tokenized rdd_data = sc.parallelize(tokenized_data) #converting to RDD so it works with IDF hashingTF = HashingTF(numFeatures = 1<<21) htf_data = hashingTF.transform(rdd_data) idf = IDF().fit(htf_data) tfidf_data = idf.transform(htf_data) tfidf_data.saveAsPickleFile("some/path") print(tfidf_data.collect()) # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...] # ----- pretend like you are in a new function or file now ----- spark = SparkSession.builder.appName( tf-idf ).getOrCreate() sc = spark.sparkContext ti = sc.pickleFile("some/path") print(ti.collect()) # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...] HOWEVER this time the order of the SparseVectors is not the same as the order when originally saved, but all the SparseVectors still exist somewhere in the RDD (I checked this, it just seemingly randomizes the order for some reason when loading the pickle file) To illustrate what it happening, if we label each SparseVector with an id starting at 0 (let s pretend we have 6 of them, in my case 8600). Then in the original RDD the indexes would be 0, 1, 2, 3, 4, 5. In the RDD read from the pickle file assuming each SparseVector is assigned the same id we now get 3, 4, 5, 0, 1, 2. Seemingly a random SparseVector is now the first one in the RDD and then the second SparseVector is just the one after it in the original and so on, once it reaches the last one in the original it then loops back to the first.
问题回答
I think, Spark does not guarantee to preserve the order of the elements, if you want to save an RDD to a file and then load it back. You need to explicitly pair each element with an index and sort the elements by this index after loading. This approach helps that the order is continued without thinking about how the data is partitioned and saved across the nodes. here is the detail: https://spark.apache.org/docs/latest/rdd-programming-guide.html I believe, this code can work. from pyspark.mllib.feature import HashingTF, IDF from pyspark.sql import SparkSession spark = SparkSession.builder.appName( tf-idf ).getOrCreate() sc = spark.sparkContext data = load_from_disk("pre cleaned data") # Tokenize data tokenizer = Tokenizer.from_file("pre trained tokenizer") tokenized_data = tokenizer.encode_batch(data["content"]) tokenized_data = [doc.tokens for doc in tokenized_data] # Converting tokenized data to ONLY list of each document tokenized # Convert to RDD rdd_data = sc.parallelize(tokenized_data) # Apply HashingTF hashingTF = HashingTF(numFeatures=1<<21) htf_data = hashingTF.transform(rdd_data) # Fit and transform using IDF idf = IDF().fit(htf_data) tfidf_data = idf.transform(htf_data) # Pair SparseVectors with their indices indexed_tfidf_data = tfidf_data.zipWithIndex().map(lambda x: (x[1], x[0])) # Save the indexed RDD as a pickle file indexed_tfidf_data.saveAsPickleFile("some/path") # Load the indexed RDD from the pickle file loaded_indexed_tfidf_data = sc.pickleFile("some/path") # Sort the loaded RDD by the indices sorted_tfidf_data = loaded_indexed_tfidf_data.sortByKey().map(lambda x: x[1]) # Collect and print the sorted RDD print(sorted_tfidf_data.collect())




相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签