Advanced RDD Actions
reduce() action
- reduce(func) action is used for aggregating the elements of a regular RDD.
- The fucntion should be commutative (changing the order of the operands does not change the result) and associative.
saveAsTextFile() action
- saveAsTextFile() action saves RDD into a text file inside a directory with each partition as a separate file.
countByKey() action
- countByKey() only available for type(K,V)
- countByKey() action counts the number of elements for each key
collectAsMap() action
- collectAsMap() return the key-value pairs in the RDD a dictionary
Example
CountingBykeys
Rdd = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5)])
total = Rdd.countByKey()
# What is the type of total?
print("The type of total is", type(total))
# Iterate over the total and print the output
for k, v in total.items():
print("key", k, "has", v, "counts")

Create a base RDD and transform
Here are the brief steps for writing the word counting program:
- Create a base RDD Complete_Shakespeare file.
- Use RDD transformation to create a long list of words from each element of the base RDD.
- Remove stop words from your data.
- Create pair RDD where each element is a pair tuple of (“w”,1)
- Group the elements of the pair RDD by key (word) and add up their values.
- Swap the keys (word) and values (counts) so that keys is count and value is the word.
- Finally, sort the RDD by descending order and print the 10 most frequent words and their frequencies.
You can dowload here, http://www.gutenberg.org/ebooks/100
baseRDD = sc.textFile("100-0.txt")
# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())
# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

for word in resultRDD.take(10):
print(word)
# Swap the keys and values
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))
# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)
# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
print("{} has {} counts". format(word[1], word[0]))

See you in the next article..
IT Tutorial IT Tutorial | Oracle DBA | SQL Server, Goldengate, Exadata, Big Data, Data ScienceTutorial