正在将 RDD( 弹坑) 保存和装入到泡菜文件中, 正在改变 Sparse Vectors 的顺序
原标题:Saving and Loading RDD (pyspark) to pickle file is changing order of SparseVectors
I trained tf-idf on a pre-tokenized (unigram tokenizer) dataset that I converted from list[list(token1, token2, token3, ...)] to an RDD using pyspark s HashingTF and IDF implementations. I tried to save the RDD with tf-idf values, but when I saved the output to a file and then loaded it from the file. The loaded file outputs an RDD that is the original saved RDD but with the order of the SparseVectors now seemingly with a random one as the first in the RDD and then assigned proper order after that.
All the parts of my code that matter:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName( tf-idf ).getOrCreate()
sc = spark.sparkContext
data = load_from_disk("pre cleaned data")
tokenizer = Tokenizer.from_file("pre trained tokenizer")
tokenized_data = tokenizer.encode_batch(data["content"])
tokenized_data = [doc.tokens for doc in tokenized_data] #converting tokenized data to ONLY list of each document tokenized
rdd_data = sc.parallelize(tokenized_data) #converting to RDD so it works with IDF
hashingTF = HashingTF(numFeatures = 1<<21)
htf_data = hashingTF.transform(rdd_data)
idf = IDF().fit(htf_data)
tfidf_data = idf.transform(htf_data)
tfidf_data.saveAsPickleFile("some/path")
print(tfidf_data.collect()) # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...]
# ----- pretend like you are in a new function or file now -----
spark = SparkSession.builder.appName( tf-idf ).getOrCreate()
sc = spark.sparkContext
ti = sc.pickleFile("some/path")
print(ti.collect()) # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...] HOWEVER this time the order of the SparseVectors is not the same as the order when originally saved, but all the SparseVectors still exist somewhere in the RDD (I checked this, it just seemingly randomizes the order for some reason when loading the pickle file)
To illustrate what it happening, if we label each SparseVector with an id starting at 0 (let s pretend we have 6 of them, in my case 8600). Then in the original RDD the indexes would be 0, 1, 2, 3, 4, 5. In the RDD read from the pickle file assuming each SparseVector is assigned the same id we now get 3, 4, 5, 0, 1, 2. Seemingly a random SparseVector is now the first one in the RDD and then the second SparseVector is just the one after it in the original and so on, once it reaches the last one in the original it then loops back to the first.
问题回答
I think, Spark does not guarantee to preserve the order of the elements, if you want to save an RDD to a file and then load it back.
You need to explicitly pair each element with an index and sort the elements by this index after loading.
This approach helps that the order is continued without thinking about how the data is partitioned and saved across the nodes.
here is the detail: https://spark.apache.org/docs/latest/rdd-programming-guide.html
I believe, this code can work.
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName( tf-idf ).getOrCreate()
sc = spark.sparkContext
data = load_from_disk("pre cleaned data")
# Tokenize data
tokenizer = Tokenizer.from_file("pre trained tokenizer")
tokenized_data = tokenizer.encode_batch(data["content"])
tokenized_data = [doc.tokens for doc in tokenized_data] # Converting tokenized data to ONLY list of each document tokenized
# Convert to RDD
rdd_data = sc.parallelize(tokenized_data)
# Apply HashingTF
hashingTF = HashingTF(numFeatures=1<<21)
htf_data = hashingTF.transform(rdd_data)
# Fit and transform using IDF
idf = IDF().fit(htf_data)
tfidf_data = idf.transform(htf_data)
# Pair SparseVectors with their indices
indexed_tfidf_data = tfidf_data.zipWithIndex().map(lambda x: (x[1], x[0]))
# Save the indexed RDD as a pickle file
indexed_tfidf_data.saveAsPickleFile("some/path")
# Load the indexed RDD from the pickle file
loaded_indexed_tfidf_data = sc.pickleFile("some/path")
# Sort the loaded RDD by the indices
sorted_tfidf_data = loaded_indexed_tfidf_data.sortByKey().map(lambda x: x[1])
# Collect and print the sorted RDD
print(sorted_tfidf_data.collect())