import itertools
import findspark
findspark.init()import pyspark
Project: Apriori Algorithm for Finding Frequent Itemsets with PySpark
Task 1: Import the Libraries and Set Up the Environment
import os
import findspark
# Set environment variables within the notebook
'JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home' # Verify this path
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark' # Verify this path
os.environ[
findspark.init()
from pyspark.sql import SparkSession
# Initialize Spark Session
= SparkSession.builder.appName("example").getOrCreate() spark
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[5], line 12 9 from pyspark.sql import SparkSession 11 # Initialize Spark Session ---> 12 spark = SparkSession.builder.appName("example").getOrCreate() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self) 495 sparkConf.set(key, value) 496 # This SparkContext may be an existing one. --> 497 sc = SparkContext.getOrCreate(sparkConf) 498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared 499 # by all sessions. 500 session = SparkSession(sc, options=self._options) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
spark.getOrCreate()
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[4], line 1 ----> 1 spark.getOrCreate() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self) 495 sparkConf.set(key, value) 496 # This SparkContext may be an existing one. --> 497 sc = SparkContext.getOrCreate(sparkConf) 498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared 499 # by all sessions. 500 session = SparkSession(sc, options=self._options) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
= [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
data = spark.createDataFrame(data, ["Name", "Value"])
df
df.show()
= pyspark.SparkConf()
conf 'apriori')
conf.setAppName('local')
conf.setMaster(= pyspark.SparkContext(conf=conf) context
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[10], line 4 2 conf.setAppName('apriori') 3 conf.setMaster('local') ----> 4 context = pyspark.SparkContext(conf=conf) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
from pyspark import SparkContext
SparkContext.getOrCreate().stop()
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[11], line 2 1 from pyspark import SparkContext ----> 2 SparkContext.getOrCreate().stop() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
= pyspark.sql.SparkSession(context) session
conf.getAll()
Task 2: Generate Combinations—Parent Intersection Property
def pre_check(freq_k_1, k):
= []
k_size_comb for i in range(len(freq_k_1)):
= set(freq_k_1[i])
x for j in range(len(freq_k_1)):
= set(freq_k_1[j])
y if j<i:
if len(x.intersection(y)) >= (k-2):
tuple(sorted(list(x.union(y)))))
k_size_comb.append(return k_size_comb
Task 3: Generate Combinations—Subset Frequency Property
def post_check(k_size_comb, freq_k_1, k):
= []
filtered for comb in k_size_comb:
= False
flag for sub_comb in itertools.combinations(comb, k-1):
if sub_comb not in freq_k_1:
= True
flag if flag == False:
tuple(comb))
filtered.append(return filtered
Task 4: Count Check
def count_check(filtered, lines, supCount):
= []
results = dict(zip(filtered, [0]*len(filtered)))
counts for combination in filtered:
= [False]*len(combination)
present for i in range(len(combination)):
for line in lines:
if combination[i] in line:
= True
present[i] if all(present):
+=1
counts[combination]
for word, count in counts.items():
if (count>=supCount):
results.append(word)return results
Task 5: Generate k-Size Combinations
def generator(freq_k_1, k, partition, support):
= list(partition)
lines = len(lines)*support
supCount
= pre_check(freq_k_1, k)
k_size_comb
= post_check(k_size_comb, freq_k_1, k)
filtered
return count_check(filtered, lines, supCount)
Task 6: Generate Singles
def get_singles(lines, support):
= len(list(lines))*support
supCount = set([])
vocab for line in lines:
for word in line:
vocab.add(word)= dict(zip(vocab, [0]*len(list(vocab))))
counts = []
combinations for line in lines:
for word in line:
+=1
counts[word] for word, count in counts.items():
if (count>=supCount):
tuple((word,)))
combinations.append(return sorted(combinations)
Task 7: The Worker Partition Mapper
= context.broadcast(2) seq_len
def apriori(iterator):
= []
partition for v in iterator:
partition.append(v)= sup.value
support = get_singles(partition, support)
resultsprint('starting with', results)
for k in range(2, seq_len.value+1):
print('sequence length', k)
= generator(results, k, partition, support)
combos
if len(combos) == 0:
print('ending at sequence length' ,k-1)
return results
= combos
results return results
Task 8: Load Data and Preprocess
= context.textFile("usercode/Dataset.csv")
rdd = rdd.first()
tagsheader = context.parallelize(tagsheader)
tags = context.broadcast(3)
seq_len = rdd.subtract(tags)
data = context.broadcast(data.count())
length = context.broadcast(0.03)
sup = data.map(lambda x: x.lstrip('"').rstrip('"').split(',')) lines
Task 9: The Distributed Transform
= lines.mapPartitions(apriori)
freq = freq.distinct()
freq = freq.collect()
comb print("Possible frequent itemset(s):\n", comb)
Task 10: Auxiliary Function to Check Presence
def auxiliary(row, combinations):
= []
presentfor combination in combinations:
= [False]*len(combination)
presence for i in range(len(combination)):
= combination[i] in row
presence[i] if all(presence):
+=[combination]
presentreturn present
Task 11: Count Check at Master
= context.broadcast(comb)
comb = lines.map(lambda x: [(key, 1) for key in auxiliary(x, comb.value)]).filter(lambda x: len(x)>0)
freq1
= freq1.flatMap(lambda x: x)
freq2 = freq2.reduceByKey(lambda x, y: x+y)
freq3 = freq3.filter(lambda x: x[1]>sup.value*length.value).map(lambda x: x[0])
freq4 freq4.collect()
End
import os
# Set JAVA_HOME and SPARK_HOME
'JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home'
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark'
os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin:' + os.environ['SPARK_HOME'] + '/bin:' + os.environ['PATH']
os.environ[# Verify the environment variables are set
print(os.environ['JAVA_HOME'])
print(os.environ['SPARK_HOME'])
print(os.environ['PATH'])
/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home
/opt/homebrew/opt/apache-spark
/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home/bin:/opt/homebrew/opt/apache-spark/bin:/Users/nenadbozinovic/Documents/blog/venv_blog/bin:/Users/nenadbozinovic/.pyenv/shims:/Users/nenadbozinovic/.pyenv/bin:/Library/Frameworks/Python.framework/Versions/3.11/bin:/Library/Frameworks/Python.framework/Versions/3.12/bin:/Library/Frameworks/Python.framework/Versions/3.9/bin:/Library/Frameworks/Python.framework/Versions/3.10/bin:/opt/homebrew/bin:/opt/homebrew/sbin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/Library/Apple/usr/bin:/Library/TeX/texbin:/Applications/quarto/bin
from pyspark.sql import SparkSession
= SparkSession.builder.appName("example") spark
spark
<pyspark.sql.session.SparkSession.Builder at 0x126396ba0>
= [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
data = spark.createDataFrame(data, ["Name", "Value"])
df
df.show()
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[8], line 2 1 data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)] ----> 2 df = spark.createDataFrame(data, ["Name", "Value"]) 4 df.show() AttributeError: 'Builder' object has no attribute 'createDataFrame'